Skip to content
Closed
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 71 additions & 69 deletions plugwise_usb/nodes/circle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
)
from ..connection import StickController
from ..constants import (
DAY_IN_HOURS,
DEFAULT_CONS_INTERVAL,
MAX_LOG_HOURS,
MAX_TIME_DRIFT,
MINIMAL_POWER_UPDATE,
NO_PRODUCTION_INTERVAL,
Expand Down Expand Up @@ -364,7 +366,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09

# Always request last energy log records at initial startup
if not self._last_energy_log_requested:
self._last_energy_log_requested, _ = await self.energy_log_update(
self._last_energy_log_requested = await self.energy_log_update(
self._current_log_address
)

Expand All @@ -378,7 +380,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
return None

# Try collecting energy-stats for _current_log_address
result, _ = await self.energy_log_update(self._current_log_address)
result = await self.energy_log_update(self._current_log_address)
if not result:
_LOGGER.debug(
"async_energy_update | %s | Log rollover | energy_log_update failed",
Expand All @@ -392,7 +394,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
_prev_log_address, _ = calc_log_address(
self._current_log_address, 1, -4
)
result, _ = await self.energy_log_update(_prev_log_address)
result = await self.energy_log_update(_prev_log_address)
if not result:
_LOGGER.debug(
"async_energy_update | %s | Log rollover | energy_log_update %s failed",
Expand All @@ -413,7 +415,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
return self._energy_counters.energy_statistics

if len(missing_addresses) == 1:
result, _ = await self.energy_log_update(missing_addresses[0])
result = await self.energy_log_update(missing_addresses[0])
if result:
await self.power_update()
_LOGGER.debug(
Expand Down Expand Up @@ -462,54 +464,16 @@ async def _get_initial_energy_logs(self) -> None:
)
total_addresses = 11
log_address = self._current_log_address
prev_address_timestamp: datetime | None = None
while total_addresses > 0:
result, empty_log = await self.energy_log_update(log_address)
if result and empty_log:
result = await self.energy_log_update(log_address)
if not result:
# Handle case with None-data in all address slots
_LOGGER.debug(
"Energy None-data collected from log address %s, stopping collection",
"All slots at log address %s are empty or outdated – stopping initial collection",
log_address,
)
break

Comment thread
coderabbitai[bot] marked this conversation as resolved.
# Check if the most recent timestamp of an earlier address is recent
# (within 2/4 * log_interval plus 5 mins margin)
log_interval = self.energy_consumption_interval
factor = 2 if self.energy_production_interval is not None else 4

if log_interval is not None:
max_gap_minutes = (factor * log_interval) + 5
if log_address == self._current_log_address:
if (
self._last_collected_energy_timestamp is not None
and (
datetime.now(tz=UTC) - self._last_collected_energy_timestamp
).total_seconds()
// 60
> max_gap_minutes
):
_LOGGER.debug(
"Energy data collected from the current log address is outdated, stopping collection"
)
break
elif (
prev_address_timestamp is not None
and self._last_collected_energy_timestamp is not None
and (
prev_address_timestamp - self._last_collected_energy_timestamp
).total_seconds()
// 60
> max_gap_minutes
):
_LOGGER.debug(
"Collected energy data is outdated, stopping collection"
)
break

if self._last_collected_energy_timestamp is not None:
prev_address_timestamp = self._last_collected_energy_timestamp

log_address, _ = calc_log_address(log_address, 1, -4)
total_addresses -= 1

Expand Down Expand Up @@ -544,30 +508,27 @@ async def get_missing_energy_logs(self) -> None:
if self._cache_enabled:
await self._energy_log_records_save_to_cache()

async def energy_log_update(self, address: int | None) -> tuple[bool, bool]:
"""Request energy log statistics from node. Returns true if successful."""
empty_log = False
result = False
async def energy_log_update(self, address: int | None) -> bool:
"""Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False."""
any_record_stored = False
if address is None:
return result, empty_log
return False

_LOGGER.debug(
"Request of energy log at address %s for node %s",
str(address),
self.name,
"Requesting EnergyLogs from node %s address %s",
self._mac_in_str,
address,
)
request = CircleEnergyLogsRequest(self._send, self._mac_in_bytes, address)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if (response := await request.send()) is None:
_LOGGER.debug(
"Retrieving of energy log at address %s for node %s failed",
str(address),
"Retrieving EnergyLogs data from node %s failed",
self._mac_in_str,
)
return result, empty_log
return False

_LOGGER.debug("EnergyLogs data from %s, address=%s", self._mac_in_str, address)
_LOGGER.debug("EnergyLogs from node %s, address=%s:", self._mac_in_str, address)
await self._available_update_state(True, response.timestamp)
energy_record_update = False

# Forward historical energy log information to energy counters
# Each response message contains 4 log counters (slots) of the
Expand All @@ -578,17 +539,23 @@ async def energy_log_update(self, address: int | None) -> tuple[bool, bool]:
_LOGGER.debug(
"In slot=%s: pulses=%s, timestamp=%s", _slot, log_pulses, log_timestamp
)
if log_timestamp is None or log_pulses is None:
if (
log_timestamp is None
or log_pulses is None
# Don't store an old log-record, store am empty record instead
or not self._check_timestamp_is_recent(address, _slot, log_timestamp)
):
self._energy_counters.add_empty_log(response.log_address, _slot)
empty_log = True
elif await self._energy_log_record_update_state(
continue

if await self._energy_log_record_update_state(
response.log_address,
_slot,
log_timestamp.replace(tzinfo=UTC),
log_pulses,
import_only=True,
):
energy_record_update = True
any_record_stored = True
if not last_energy_timestamp_collected:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
# Collect the timestamp of the most recent response
self._last_collected_energy_timestamp = log_timestamp.replace(
Expand All @@ -600,15 +567,35 @@ async def energy_log_update(self, address: int | None) -> tuple[bool, bool]:
)
last_energy_timestamp_collected = True

result = True
self._energy_counters.update()
if energy_record_update:
if any_record_stored:
_LOGGER.debug(
"Saving energy record update to cache for %s", self._mac_in_str
)
await self.save_cache()

return result, empty_log
return any_record_stored

def _check_timestamp_is_recent(
self, address: int, slot: int, timestamp: datetime
) -> bool:
"""Check if the timestamp of the received log-record is recent.

A timestamp from within the last 24 hours is considered recent.
"""
age_seconds = (
datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)
).total_seconds()
if age_seconds > DAY_IN_HOURS * 3600:
_LOGGER.warning(
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring...",
self._mac_in_str,
address,
slot,
timestamp,
)
return False
return True

async def _energy_log_records_load_from_cache(self) -> bool:
"""Load energy_log_record from cache."""
Expand Down Expand Up @@ -649,6 +636,20 @@ async def _energy_log_records_load_from_cache(self) -> bool:
restored_logs[address] = []
restored_logs[address].append(slot)

# Sort and prune the records loaded from cache
sorted_logs: dict[int, dict[int, PulseLogRecord]] = {}
skip_before = datetime.now(tz=UTC) - timedelta(hours=MAX_LOG_HOURS)
sorted_address = sorted(restored_logs.keys(), reverse=True)
for address in sorted_address:
sorted_slots = sorted(restored_logs[address].keys(), reverse=True)
for slot in sorted_slots:
if restored_logs[address][slot].timestamp > skip_before:
if sorted_log.get(address) is None:
sorted_log[address] = {}
sorted_log[address][slot] = restored_logs[address][slot]

restored_logs = sorted_logs

Comment thread
coderabbitai[bot] marked this conversation as resolved.
self._energy_counters.update()

# Create task to retrieve remaining (missing) logs
Expand Down Expand Up @@ -677,9 +678,10 @@ async def _energy_log_records_save_to_cache(self) -> None:
self._energy_counters.get_pulse_logs()
)
cached_logs = ""
for address in sorted(logs.keys(), reverse=True):
for slot in sorted(logs[address].keys(), reverse=True):
log = logs[address][slot]
# logs is already sorted in reverse
for address, record in logs.items():
for slot in record:
log = record[slot]
if cached_logs != "":
cached_logs += "|"
cached_logs += f"{address}:{slot}:{log.timestamp.year}"
Expand Down Expand Up @@ -718,7 +720,7 @@ async def _energy_log_record_update_state(
self._mac_in_str,
)
self._set_cache(
CACHE_ENERGY_COLLECTION, cached_logs + "|" + log_cache_record
CACHE_ENERGY_COLLECTION, log_cache_record + "|" + cached_logs
)
return True

Expand Down
Loading