diff --git a/apps/predbat/axle.py b/apps/predbat/axle.py index a1274c06e..55eacee71 100644 --- a/apps/predbat/axle.py +++ b/apps/predbat/axle.py @@ -377,11 +377,21 @@ def load_axle_slot(base, axle_sessions, export, rate_replicate={}): if export: base.rate_export[minute] = base.rate_export.get(minute, 0) + pence_per_kwh rate_replicate[minute] = "saving" + + # Track Axle override in rate store + if base.rate_store: + today = datetime.now() + base.rate_store.update_auto_override(today, minute, None, base.rate_export[minute], "Axle") else: base.rate_import[minute] = base.rate_import.get(minute, 0) + pence_per_kwh base.load_scaling_dynamic[minute] = base.load_scaling_saving rate_replicate[minute] = "saving" + # Track Axle override in rate store + if base.rate_store: + today = datetime.now() + base.rate_store.update_auto_override(today, minute, base.rate_import[minute], None, "Axle") + def fetch_axle_active(base): """ diff --git a/apps/predbat/config.py b/apps/predbat/config.py index fa7d9076e..8fac7dd74 100644 --- a/apps/predbat/config.py +++ b/apps/predbat/config.py @@ -826,6 +826,18 @@ "type": "switch", "default": True, }, + { + "name": "rate_retention_days", + "friendly_name": "Rate Retention Days", + "type": "input_number", + "min": 1, + "max": 365, + "step": 1, + "unit": "days", + "icon": "mdi:database-clock", + "enable": "expert_mode", + "default": 7, + }, { "name": "set_charge_freeze", "friendly_name": "Set Charge Freeze", @@ -1437,6 +1449,7 @@ "days_previous": True, "days_previous_weight": True, "battery_scaling": True, + "rate_retention_days": True, "forecast_hours": True, "import_export_scaling": True, "inverter_limit_charge": True, @@ -2073,6 +2086,7 @@ "rates_export_override": {"type": "dict_list"}, "days_previous": {"type": "integer_list"}, "days_previous_weight": {"type": "float_list"}, + "rate_retention_days": {"type": "integer"}, "forecast_hours": {"type": "integer"}, "notify_devices": {"type": "string_list"}, "battery_scaling": {"type": "sensor_list", "sensor_type": "float", "entries": "num_inverters", "modify": False}, diff --git a/apps/predbat/fetch.py b/apps/predbat/fetch.py index 6b18e8ec8..7cd3c43e5 100644 --- a/apps/predbat/fetch.py +++ b/apps/predbat/fetch.py @@ -957,6 +957,23 @@ def fetch_sensor_data(self, save=True): if self.rate_import: self.rate_scan(self.rate_import, print=False) self.rate_import, self.rate_import_replicated = self.rate_replicate(self.rate_import, self.io_adjusted, is_import=True) + + # Persist base import rates to storage (only non-replicated/non-override data) + if self.rate_store: + today = datetime.now() + for minute in self.rate_import: + # Only persist true API data, not replicated or override data + if minute not in self.rate_import_replicated or self.rate_import_replicated[minute] == "got": + # Get corresponding export rate or use 0 + export_rate = self.rate_export.get(minute, 0) if self.rate_export else 0 + self.rate_store.write_base_rate(today, minute, self.rate_import[minute], export_rate) + + # Rehydrate finalized rates from storage - these take priority over fresh API data + for minute in range(0, self.minutes_now): + finalized_rate = self.rate_store.get_rate(today, minute, is_import=True) + if finalized_rate is not None: + self.rate_import[minute] = finalized_rate + self.rate_import_no_io = self.rate_import.copy() self.rate_import = self.rate_add_io_slots(self.rate_import, self.octopus_slots) self.load_saving_slot(self.octopus_saving_slots, export=False, rate_replicate=self.rate_import_replicated) @@ -973,6 +990,23 @@ def fetch_sensor_data(self, save=True): if self.rate_export: self.rate_scan_export(self.rate_export, print=False) self.rate_export, self.rate_export_replicated = self.rate_replicate(self.rate_export, is_import=False) + + # Persist base export rates to storage (only non-replicated/non-override data) + if self.rate_store: + today = datetime.now() + for minute in self.rate_export: + # Only persist true API data, not replicated or override data + if minute not in self.rate_export_replicated or self.rate_export_replicated[minute] == "got": + # Get corresponding import rate or use 0 + import_rate = self.rate_import.get(minute, 0) if self.rate_import else 0 + self.rate_store.write_base_rate(today, minute, import_rate, self.rate_export[minute]) + + # Rehydrate finalized rates from storage - these take priority over fresh API data + for minute in range(0, self.minutes_now): + finalized_rate = self.rate_store.get_rate(today, minute, is_import=False) + if finalized_rate is not None: + self.rate_export[minute] = finalized_rate + # For export tariff only load the saving session if enabled if self.rate_export_max > 0: self.load_saving_slot(self.octopus_saving_slots, export=True, rate_replicate=self.rate_export_replicated) @@ -988,6 +1022,13 @@ def fetch_sensor_data(self, save=True): if self.rate_import or self.rate_export: self.set_rate_thresholds() + # Finalize past slots (5+ minutes past slot start) + if self.rate_store: + today = datetime.now() + finalized = self.rate_store.finalize_slots(today, self.minutes_now) + if finalized > 0: + self.log("Finalized {} rate slots".format(finalized)) + # Find discharging windows if self.rate_export: self.high_export_rates, lowest, highest = self.rate_scan_window(self.rate_export, 5, self.rate_export_cost_threshold, True, alt_rates=self.rate_import) @@ -1390,6 +1431,14 @@ def apply_manual_rates(self, rates, manual_items, is_import=True, rate_replicate rates[minute] = rate rate_replicate[minute] = "manual" + # Track manual override in rate store + if self.rate_store: + today = datetime.now() + if is_import: + self.rate_store.update_manual_override(today, minute, rate, None) + else: + self.rate_store.update_manual_override(today, minute, None, rate) + return rates def basic_rates(self, info, rtype, prev=None, rate_replicate={}): diff --git a/apps/predbat/octopus.py b/apps/predbat/octopus.py index 86e706006..ac37aed62 100644 --- a/apps/predbat/octopus.py +++ b/apps/predbat/octopus.py @@ -1947,12 +1947,22 @@ def load_saving_slot(self, octopus_saving_slots, export=False, rate_replicate={} if minute in self.rate_export: self.rate_export[minute] += rate rate_replicate[minute] = "saving" + + # Track saving session override in rate store + if self.rate_store: + today = datetime.now() + self.rate_store.update_auto_override(today, minute, None, self.rate_export[minute], "Saving") else: if minute in self.rate_import: self.rate_import[minute] += rate self.load_scaling_dynamic[minute] = self.load_scaling_saving rate_replicate[minute] = "saving" + # Track saving session override in rate store + if self.rate_store: + today = datetime.now() + self.rate_store.update_auto_override(today, minute, self.rate_import[minute], None, "Saving") + def decode_octopus_slot(self, slot, raw=False): """ Decode IOG slot @@ -2149,10 +2159,20 @@ def rate_add_io_slots(self, rates, octopus_slots): slots_added_set.add(slot_start) rates[minute] = assumed_price + # Track IOG override in rate store + if self.rate_store: + today = datetime.now() + self.rate_store.update_auto_override(today, minute, assumed_price, None, "IOG") + else: # For minutes within a 30-min slot, only apply if the slot was added if slot_start in slots_added_set: rates[minute] = assumed_price + + # Track IOG override in rate store + if self.rate_store: + today = datetime.now() + self.rate_store.update_auto_override(today, minute, assumed_price, None, "IOG") else: assumed_price = self.rate_import.get(start_minutes, self.rate_min) diff --git a/apps/predbat/persistent_store.py b/apps/predbat/persistent_store.py new file mode 100644 index 000000000..689949102 --- /dev/null +++ b/apps/predbat/persistent_store.py @@ -0,0 +1,189 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +# fmt: off +# pylint: disable=consider-using-f-string +# pylint: disable=line-too-long +# pylint: disable=attribute-defined-outside-init + +""" +Base class for persistent JSON file storage with backup and cleanup. +Provides common functionality for components needing to store state across restarts. +""" + +import json +import os +from datetime import datetime, timedelta +from pathlib import Path + + +class PersistentStore: + """ + Abstract base class for persistent JSON file storage. + Handles load/save with backup, cleanup of old files, and automatic timestamping. + """ + + def __init__(self, base): + """Initialize with reference to base PredBat instance""" + self.base = base + self.log = base.log + + def load(self, filepath): + """ + Load data from JSON file with automatic backup restoration on corruption. + + Args: + filepath: Path to JSON file to load + + Returns: + Loaded data dict or None if file doesn't exist or is corrupted + """ + try: + if not os.path.exists(filepath): + return None + + with open(filepath, 'r') as f: + data = json.load(f) + return data + + except (json.JSONDecodeError, IOError) as e: + self.log(f"Warn: Failed to load {filepath}: {e}") + + # Try to restore from backup + backup_path = filepath + '.bak' + if os.path.exists(backup_path): + try: + self.log(f"Warn: Attempting to restore from backup: {backup_path}") + with open(backup_path, 'r') as f: + data = json.load(f) + self.log(f"Warn: Successfully restored from backup") + return data + except (json.JSONDecodeError, IOError) as e2: + self.log(f"Error: Backup restoration failed: {e2}") + + return None + + def save(self, filepath, data, backup=True): + """ + Save data to JSON file with automatic backup and timestamp. + + Args: + filepath: Path to JSON file to save + data: Dict to save (will add last_updated timestamp) + backup: Whether to backup existing file before overwrite + + Returns: + True if successful, False otherwise + """ + try: + # Add timestamp + data['last_updated'] = datetime.now().astimezone().isoformat() + + # Create directory if needed + os.makedirs(os.path.dirname(filepath), exist_ok=True) + + # Backup existing file if requested + if backup and os.path.exists(filepath): + self.backup_file(filepath) + + # Write new file + with open(filepath, 'w') as f: + json.dump(data, f, indent=2) + + # Cleanup old backups + self.cleanup_backups(filepath) + + return True + + except (IOError, OSError) as e: + self.log(f"Error: Failed to save {filepath}: {e}") + return False + + def backup_file(self, filepath): + """ + Create backup copy of file. + + Args: + filepath: Path to file to backup + """ + try: + backup_path = filepath + '.bak' + if os.path.exists(filepath): + import shutil + shutil.copy2(filepath, backup_path) + except (IOError, OSError) as e: + self.log(f"Warn: Failed to backup {filepath}: {e}") + + def cleanup_backups(self, filepath): + """ + Remove backup files older than 1 day. + + Args: + filepath: Path to main file (will check for .bak file) + """ + try: + backup_path = filepath + '.bak' + if os.path.exists(backup_path): + # Check file age + file_time = datetime.fromtimestamp(os.path.getmtime(backup_path)) + age = datetime.now() - file_time + + if age > timedelta(days=1): + os.remove(backup_path) + self.log(f"Info: Cleaned up old backup: {backup_path}") + + except (IOError, OSError) as e: + self.log(f"Warn: Failed to cleanup backup for {filepath}: {e}") + + def cleanup(self, directory, pattern, retention_days): + """ + Remove files matching pattern older than retention period. + + Args: + directory: Directory to search + pattern: Glob pattern for files to cleanup + retention_days: Number of days to retain files + + Returns: + Number of files removed + """ + try: + if not os.path.exists(directory): + return 0 + + path = Path(directory) + cutoff_time = datetime.now() - timedelta(days=retention_days) + removed_count = 0 + + for file_path in path.glob(pattern): + try: + file_time = datetime.fromtimestamp(file_path.stat().st_mtime) + if file_time < cutoff_time: + file_path.unlink() + removed_count += 1 + self.log(f"Info: Cleaned up old file: {file_path}") + except (IOError, OSError) as e: + self.log(f"Warn: Failed to remove {file_path}: {e}") + + return removed_count + + except Exception as e: + self.log(f"Error: Cleanup failed for {directory}/{pattern}: {e}") + return 0 + + def get_last_updated(self, filepath): + """ + Get last_updated timestamp from JSON file. + + Args: + filepath: Path to JSON file + + Returns: + ISO 8601 timestamp string or None + """ + data = self.load(filepath) + if data and 'last_updated' in data: + return data['last_updated'] + return None diff --git a/apps/predbat/predbat.py b/apps/predbat/predbat.py index c39fb7ead..726b905f9 100644 --- a/apps/predbat/predbat.py +++ b/apps/predbat/predbat.py @@ -30,7 +30,7 @@ THIS_VERSION = "v8.33.0" # fmt: off -PREDBAT_FILES = ["predbat.py", "const.py", "hass.py", "config.py", "prediction.py", "gecloud.py", "utils.py", "inverter.py", "ha.py", "download.py", "web.py", "web_helper.py", "predheat.py", "futurerate.py", "octopus.py", "solcast.py", "execute.py", "plan.py", "fetch.py", "output.py", "userinterface.py", "energydataservice.py", "alertfeed.py", "compare.py", "db_manager.py", "db_engine.py", "plugin_system.py", "ohme.py", "components.py", "fox.py", "carbon.py", "temperature.py", "web_mcp.py", "component_base.py", "axle.py", "solax.py", "solis.py", "unit_test.py", "load_ml_component.py", "load_predictor.py"] +PREDBAT_FILES = ["predbat.py", "const.py", "hass.py", "config.py", "prediction.py", "gecloud.py", "utils.py", "inverter.py", "ha.py", "download.py", "web.py", "web_helper.py", "predheat.py", "futurerate.py", "octopus.py", "solcast.py", "execute.py", "plan.py", "fetch.py", "output.py", "userinterface.py", "energydataservice.py", "alertfeed.py", "compare.py", "db_manager.py", "db_engine.py", "plugin_system.py", "ohme.py", "components.py", "fox.py", "carbon.py", "temperature.py", "web_mcp.py", "component_base.py", "axle.py", "solax.py", "solis.py", "unit_test.py", "load_ml_component.py", "load_predictor.py", "persistent_store.py", "rate_store.py"] # fmt: on from download import predbat_update_move, predbat_update_download, check_install @@ -76,6 +76,7 @@ from userinterface import UserInterface from compare import Compare from plugin_system import PluginSystem +from rate_store import RateStore class PredBat(hass.Hass, Octopus, Energidataservice, Fetch, Plan, Execute, Output, UserInterface): @@ -474,6 +475,7 @@ def reset(self): self.rate_import_no_io = {} self.rate_export = {} self.rate_gas = {} + self.rate_store = None self.rate_slots = [] self.low_rates = [] self.high_export_rates = [] @@ -1493,6 +1495,8 @@ def initialize(self): self.validate_config() self.comparison = Compare(self) + self.rate_store = RateStore(self) + self.components.initialize(phase=1) if not self.components.start(phase=1): self.log("Error: Some components failed to start (phase1)") diff --git a/apps/predbat/rate_store.py b/apps/predbat/rate_store.py new file mode 100644 index 000000000..2c8d70117 --- /dev/null +++ b/apps/predbat/rate_store.py @@ -0,0 +1,474 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +# fmt: off +# pylint: disable=consider-using-f-string +# pylint: disable=line-too-long +# pylint: disable=attribute-defined-outside-init + +""" +Persistent storage for import and export rates with finalization logic. +Stores rates at the time they are first retrieved and applies overrides separately, +preventing retrospective changes to historical cost calculations. +""" + +import os +from datetime import datetime, timedelta +from persistent_store import PersistentStore + + +class RateStore(PersistentStore): + """ + Manages persistent storage of energy rates with slot-based structure. + + Stores rates in 30-minute slots (configurable via plan_interval_minutes) with: + - initial: Base rate from API at first retrieval + - automatic: Override from external services (IOG, Axle, saving sessions) + - manual: User override from manual selectors + - finalised: Lock flag set 5 minutes past slot start time + + File structure: predbat_save/rates_YYYY_MM_DD.json + """ + + def __init__(self, base, save_dir="predbat_save"): + """ + Initialize rate store. + + Args: + base: PredBat instance + save_dir: Directory for rate files (relative to workspace root) + """ + super().__init__(base) + self.save_dir = save_dir + self.plan_interval_minutes = base.plan_interval_minutes + + # In-memory cache of loaded rate files + # Key: date string "YYYY-MM-DD", Value: rate data dict + self.rate_cache = {} + + # Load and finalize rates for today and yesterday + today = datetime.now() + yesterday = today - timedelta(days=1) + self.load_rates(today) + self.load_rates(yesterday) + + # Finalise past slots + finalised_today = self.finalise_slots(today, base.minutes_now) + finalised_yesterday = self.finalise_slots(yesterday, 24 * 60) # Finalise all yesterday slots + if finalised_today > 0 or finalised_yesterday > 0: + self.log("Finalised {} slots for today and {} slots for yesterday".format(finalised_today, finalised_yesterday)) + + # Cleanup old rate files + retention_days = base.get_arg("rate_retention_days", 7) + removed = self.cleanup_old_files(retention_days) + if removed > 0: + self.log("Cleaned up {} old rate files".format(removed)) + + def _get_filepath(self, date): + """ + Get filepath for rate file for given date. + + Args: + date: datetime object + + Returns: + Full path string to rate JSON file + """ + date_str = date.strftime("%Y_%m_%d") + filename = f"rates_{date_str}.json" + return os.path.join(self.save_dir, filename) + + def _get_date_key(self, date): + """Get cache key for date""" + return date.strftime("%Y-%m-%d") + + def _minutes_to_time(self, minutes): + """ + Convert minute offset from midnight to HH:MM string. + + Args: + minutes: Minutes since midnight + + Returns: + Time string in format "HH:MM" + """ + hours = int(minutes // 60) + mins = int(minutes % 60) + return f"{hours:02d}:{mins:02d}" + + def _time_to_minutes(self, time_str): + """ + Convert HH:MM string to minute offset from midnight. + + Args: + time_str: Time in format "HH:MM" + + Returns: + Minutes since midnight as int + """ + parts = time_str.split(':') + return int(parts[0]) * 60 + int(parts[1]) + + def _get_slot_start(self, minutes): + """ + Get slot start time for given minute offset. + Uses same calculation as output.py line 997. + + Args: + minutes: Minute offset from midnight + + Returns: + Slot start minute offset + """ + return int(minutes / self.plan_interval_minutes) * self.plan_interval_minutes + + def _init_empty_structure(self): + """ + Create empty rate data structure. + + Returns: + Dict with plan_interval_minutes and empty import/export rate dicts + """ + return { + 'plan_interval_minutes': self.plan_interval_minutes, + 'rates_import': {}, + 'rates_export': {} + } + + def _init_empty_slot(self): + """ + Create empty slot structure. + + Returns: + Dict with initial/automatic/manual/finalised fields + """ + return { + 'initial': None, + 'automatic': None, + 'manual': None, + 'finalised': False + } + + def load_rates(self, date): + """ + Load rate data for given date into cache. + + Args: + date: datetime object for date to load + + Returns: + Rate data dict or None if file doesn't exist + """ + date_key = self._get_date_key(date) + + # Check if already cached + if date_key in self.rate_cache: + return self.rate_cache[date_key] + + # Load from file + filepath = self._get_filepath(date) + data = self.load(filepath) + + if data is None: + # Initialize empty structure + data = self._init_empty_structure() + + # Validate plan_interval_minutes matches + if 'plan_interval_minutes' in data: + if data['plan_interval_minutes'] != self.plan_interval_minutes: + self.log(f"Error: Rate file {filepath} has plan_interval_minutes={data['plan_interval_minutes']} but current config is {self.plan_interval_minutes}. Creating backup and starting fresh.") + # Backup old file + self.backup_file(filepath) + # Start with empty structure + data = self._init_empty_structure() + else: + # Old file format, add field + data['plan_interval_minutes'] = self.plan_interval_minutes + + # Ensure structure exists + if 'rates_import' not in data: + data['rates_import'] = {} + if 'rates_export' not in data: + data['rates_export'] = {} + + # Cache it + self.rate_cache[date_key] = data + + return data + + def save_rates(self, date): + """ + Save rate data for given date from cache to file. + + Args: + date: datetime object for date to save + + Returns: + True if successful + """ + date_key = self._get_date_key(date) + + if date_key not in self.rate_cache: + self.log(f"Warn: No rate data in cache for {date_key}") + return False + + data = self.rate_cache[date_key] + filepath = self._get_filepath(date) + + return self.save(filepath, data, backup=True) + + def write_base_rate(self, date, minute, rate_import, rate_export): + """ + Write initial base rate for a slot (only if not already set). + This captures the rate at first retrieval from API. + + Args: + date: datetime object for the date + minute: Minute offset from midnight + rate_import: Import rate value + rate_export: Export rate value + """ + # Load rate data + data = self.load_rates(date) + + # Get slot start + slot_start = self._get_slot_start(minute) + slot_time = self._minutes_to_time(slot_start) + + # Initialize slots if needed + if slot_time not in data['rates_import']: + data['rates_import'][slot_time] = self._init_empty_slot() + if slot_time not in data['rates_export']: + data['rates_export'][slot_time] = self._init_empty_slot() + + # Only write initial rate if not already set + if data['rates_import'][slot_time]['initial'] is None: + data['rates_import'][slot_time]['initial'] = rate_import + + if data['rates_export'][slot_time]['initial'] is None: + data['rates_export'][slot_time]['initial'] = rate_export + + # Save immediately + self.save_rates(date) + + def update_auto_override(self, date, minute, rate_import, rate_export, source): + """ + Update automatic override rate for a slot (IOG, Axle, saving sessions). + Only updates non-finalised slots. + + Args: + date: datetime object for the date + minute: Minute offset from midnight + rate_import: Import rate value or None to clear + rate_export: Export rate value or None to clear + source: String identifying override source (e.g., "IOG", "Axle") + """ + # Load rate data + data = self.load_rates(date) + + # Get slot start + slot_start = self._get_slot_start(minute) + slot_time = self._minutes_to_time(slot_start) + + # Initialize slots if needed + if slot_time not in data['rates_import']: + data['rates_import'][slot_time] = self._init_empty_slot() + if slot_time not in data['rates_export']: + data['rates_export'][slot_time] = self._init_empty_slot() + + if data['rates_import'][slot_time]['finalised']: + # Don't modify finalised slots + return + + # Store override with source tracking + import_slot = data['rates_import'][slot_time] + export_slot = data['rates_export'][slot_time] + + if rate_import is not None: + import_slot['automatic'] = { + 'rate': rate_import, + 'source': source + } + else: + # Clear override + import_slot['automatic'] = None + + if rate_export is not None: + export_slot['automatic'] = { + 'rate': rate_export, + 'source': source + } + else: + # Clear override + export_slot['automatic'] = None + + # Save immediately + self.save_rates(date) + + def update_manual_override(self, date, minute, rate_import, rate_export): + """ + Update manual override rate for a slot (from user selectors). + Only updates non-finalised slots. + + Args: + date: datetime object for the date + minute: Minute offset from midnight + rate_import: Import rate value or None to clear + rate_export: Export rate value or None to clear + """ + # Load rate data + data = self.load_rates(date) + + # Get slot start + slot_start = self._get_slot_start(minute) + slot_time = self._minutes_to_time(slot_start) + + # Initialize slots if needed + if slot_time not in data['rates_import']: + data['rates_import'][slot_time] = self._init_empty_slot() + if slot_time not in data['rates_export']: + data['rates_export'][slot_time] = self._init_empty_slot() + + # Check if slot is finalised + if data['rates_import'][slot_time]['finalised']: + # Don't modify finalised slots + return + + # Store manual override + data['rates_import'][slot_time]['manual'] = rate_import + data['rates_export'][slot_time]['manual'] = rate_export + + # Save immediately + self.save_rates(date) + + def finalise_slots(self, date, current_minute): + """ + Finalise all slots that have passed their start time by 5+ minutes. + Finalised slots cannot be modified by overrides. + + Args: + date: datetime object for the date + current_minute: Current minute offset from midnight + + Returns: + Number of slots finalised + """ + # Load rate data + data = self.load_rates(date) + + finalised_count = 0 + + # Process all slots + for slot_time in data['rates_import'].keys(): + slot_minute = self._time_to_minutes(slot_time) + + # Check if slot should be finalised + # Finalise if current time is 5+ minutes past slot start + if current_minute >= slot_minute + 5: + if not data['rates_import'][slot_time]['finalised']: + data['rates_import'][slot_time]['finalised'] = True + data['rates_export'][slot_time]['finalised'] = True + finalised_count += 1 + + if finalised_count > 0: + self.save_rates(date) + + return finalised_count + + def get_rate(self, date, minute, is_import=True): + """ + Get effective rate for a given time. + Returns manual override > automatic override > initial rate > 0. + + Args: + date: datetime object for the date + minute: Minute offset from midnight + is_import: True for import rate, False for export rate + + Returns: + Rate value (float) or 0 if not found + """ + # Load rate data + data = self.load_rates(date) + + # Get slot start + slot_start = self._get_slot_start(minute) + slot_time = self._minutes_to_time(slot_start) + + # Select import or export rates + rates = data['rates_import'] if is_import else data['rates_export'] + + if slot_time not in rates: + return 0 + + slot = rates[slot_time] + + # Priority: manual > automatic > initial > 0 + if slot['manual'] is not None: + return slot['manual'] + + if slot['automatic'] is not None: + # Handle dict format with source tracking + if isinstance(slot['automatic'], dict): + return slot['automatic']['rate'] + return slot['automatic'] + + if slot['initial'] is not None: + return slot['initial'] + + return 0 + + def get_automatic_rate(self, date, minute, is_import=True): + """ + Get the automatic override rate (ignoring manual overrides). + Used for displaying what automatic systems are doing. + + Args: + date: datetime object for the date + minute: Minute offset from midnight + is_import: True for import rate, False for export rate + + Returns: + Rate value (float) or None if no automatic override + """ + # Load rate data + data = self.load_rates(date) + + # Get slot start + slot_start = self._get_slot_start(minute) + slot_time = self._minutes_to_time(slot_start) + + # Select import or export rates + rates = data['rates_import'] if is_import else data['rates_export'] + + if slot_time not in rates: + return None + + slot = rates[slot_time] + + # Return automatic override if set + if slot['automatic'] is not None: + # Handle dict format with source tracking + if isinstance(slot['automatic'], dict): + return slot['automatic']['rate'] + return slot['automatic'] + + # Fall back to initial rate + if slot['initial'] is not None: + return slot['initial'] + + return None + + def cleanup_old_files(self, retention_days): + """ + Remove rate files older than retention period. + + Args: + retention_days: Number of days to retain files + + Returns: + Number of files removed + """ + return self.cleanup(self.save_dir, "rates_*.json", retention_days) diff --git a/apps/predbat/tests/test_axle.py b/apps/predbat/tests/test_axle.py index 2b17a4bf3..9ab8551de 100644 --- a/apps/predbat/tests/test_axle.py +++ b/apps/predbat/tests/test_axle.py @@ -870,6 +870,7 @@ def __init__(self): self.minutes_now = 10 * 60 # 10:00 AM self.forecast_minutes = 24 * 60 # 24 hours self.prefix = "predbat" + self.rate_store = None # No rate persistence in tests # Initialize rate_export with base rates for each minute self.rate_export = {} diff --git a/apps/predbat/tests/test_rate_store.py b/apps/predbat/tests/test_rate_store.py new file mode 100644 index 000000000..8c908be0e --- /dev/null +++ b/apps/predbat/tests/test_rate_store.py @@ -0,0 +1,357 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +# fmt: off +# pylint: disable=consider-using-f-string +# pylint: disable=line-too-long +# pylint: disable=attribute-defined-outside-init + +import os +import json +import shutil +from datetime import datetime, timedelta +from rate_store import RateStore + + +def run_rate_store_tests(my_predbat): + """ + Run comprehensive tests for rate persistence and finalisation + + Args: + my_predbat: PredBat instance (unused for these tests but required for consistency) + + Returns: + bool: False if all tests pass, True if any test fails + """ + failed = False + + # Create test directory + test_dir = "test_rate_store_temp" + if os.path.exists(test_dir): + shutil.rmtree(test_dir) + os.makedirs(test_dir) + + try: + print("*** Test 1: Basic rate persistence") + failed |= test_basic_persistence(os.path.join(test_dir, "test1")) + + print("*** Test 2: Rate finalisation") + failed |= test_finalisation(os.path.join(test_dir, "test2")) + + print("*** Test 3: Override priority (manual > automatic > initial)") + failed |= test_override_priority(os.path.join(test_dir, "test3")) + + print("*** Test 4: Finalised rates resist fresh API data") + failed |= test_finalised_resistance(os.path.join(test_dir, "test4")) + + print("*** Test 5: Cleanup old files") + failed |= test_cleanup(os.path.join(test_dir, "test5")) + + finally: + # Cleanup + if os.path.exists(test_dir): + shutil.rmtree(test_dir) + + return failed + + +def test_basic_persistence(test_dir): + """Test basic write and read of rates""" + + # Create test subdirectory + os.makedirs(test_dir, exist_ok=True) + + # Create mock base object + class MockBase: + def __init__(self): + self.plan_interval_minutes = 30 + self.minutes_now = 720 # 12:00 + + def log(self, msg): + print(f" {msg}") + + def get_arg(self, key, default): + if key == "rate_retention_days": + return 7 + return default + + base = MockBase() + store = RateStore(base, save_dir=test_dir) + + # Write some base rates + today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + + for hour in range(24): + minute = hour * 60 + import_rate = 10.0 + hour # Rates from 10.0 to 33.0 + export_rate = 5.0 + hour # Rates from 5.0 to 28.0 + store.write_base_rate(today, minute, import_rate, export_rate) + + # Verify rates written + for hour in range(24): + minute = hour * 60 + expected_import = 10.0 + hour + expected_export = 5.0 + hour + + actual_import = store.get_rate(today, minute, is_import=True) + actual_export = store.get_rate(today, minute, is_import=False) + + if actual_import is None or abs(actual_import - expected_import) > 0.01: + print(f" ERROR: Expected import rate {expected_import} at minute {minute}, got {actual_import}") + return True + + if actual_export is None or abs(actual_export - expected_export) > 0.01: + print(f" ERROR: Expected export rate {expected_export} at minute {minute}, got {actual_export}") + return True + + print(" PASS: Basic persistence working") + return False + + +def test_finalisation(test_dir): + """Test that rates become finalised after their slot time + buffer""" + + # Create test subdirectory + os.makedirs(test_dir, exist_ok=True) + + class MockBase: + def __init__(self): + self.plan_interval_minutes = 30 + self.minutes_now = 720 # 12:00 + + def log(self, msg): + print(f" {msg}") + + def get_arg(self, key, default): + if key == "rate_retention_days": + return 7 + return default + + base = MockBase() + store = RateStore(base, save_dir=test_dir) + + # Write base rates for past slots (more than 5 minutes ago) + today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + + # Write rates for slots that should be finalised (00:00, 00:30, 01:00) + store.write_base_rate(today, 0, 15.0, 5.0) + store.write_base_rate(today, 30, 20.0, 10.0) + store.write_base_rate(today, 60, 25.0, 15.0) + + # Finalise past slots (set current minute to 70 which is past 01:00+5min buffer) + store.finalise_slots(today, 70) + + # Check that slots are finalized in the JSON file + date_str = today.strftime("%Y_%m_%d") + file_path = os.path.join(test_dir, f"rates_{date_str}.json") + + if not os.path.exists(file_path): + print(f" ERROR: Rate file not found at {file_path}") + return True + + with open(file_path, "r") as f: + data = json.load(f) + + # Check finalised flags + if "rates_import" not in data or "rates_export" not in data: + print(" ERROR: Missing rate sections in file") + return True + + # Slot at 0 should be finalised + if "00:00" not in data["rates_import"] or not data["rates_import"]["00:00"]["finalised"]: + print(" ERROR: Slot 00:00 import should be finalised") + return True + + if "00:00" not in data["rates_export"] or not data["rates_export"]["00:00"]["finalised"]: + print(" ERROR: Slot 00:00 export should be finalised") + return True + + # Slot at 30 should be finalised + if "00:30" not in data["rates_import"] or not data["rates_import"]["00:30"]["finalised"]: + print(" ERROR: Slot 00:30 import should be finalised") + return True + + # Slot at 60 should be finalised + if "01:00" not in data["rates_import"] or not data["rates_import"]["01:00"]["finalised"]: + print(" ERROR: Slot 01:00 import should be finalised") + return True + + print(" PASS: Finalisation working correctly") + return False + + +def test_override_priority(test_dir): + """Test that manual overrides take priority over automatic, which take priority over initial""" + + # Create test subdirectory + os.makedirs(test_dir, exist_ok=True) + + class MockBase: + def __init__(self): + self.plan_interval_minutes = 30 + self.minutes_now = 720 # 12:00 + + def log(self, msg): + print(f" {msg}") + + def get_arg(self, key, default): + if key == "rate_retention_days": + return 7 + return default + + base = MockBase() + store = RateStore(base, save_dir=test_dir) + + today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + minute = 120 # 02:00 + + # Write initial rate + store.write_base_rate(today, minute, 10.0, 5.0) + + # Check initial rate + rate = store.get_rate(today, minute, is_import=True) + if rate is None or abs(rate - 10.0) > 0.01: + print(f" ERROR: Expected initial import rate 10.0, got {rate}") + return True + + # Apply automatic override (IOG) + store.update_auto_override(today, minute, 5.0, 2.0, source="IOG") + + rate = store.get_rate(today, minute, is_import=True) + if rate is None or abs(rate - 5.0) > 0.01: + print(f" ERROR: Expected automatic import rate 5.0, got {rate}") + return True + + # Check automatic rate directly + auto_rate = store.get_automatic_rate(today, minute, is_import=True) + if auto_rate is None or abs(auto_rate - 5.0) > 0.01: + print(f" ERROR: Expected get_automatic_rate 5.0, got {auto_rate}") + return True + + # Apply manual override + store.update_manual_override(today, minute, 3.0, 1.0) + + rate = store.get_rate(today, minute, is_import=True) + if rate is None or abs(rate - 3.0) > 0.01: + print(f" ERROR: Expected manual import rate 3.0, got {rate}") + return True + + # Check automatic rate is still preserved + auto_rate = store.get_automatic_rate(today, minute, is_import=True) + if auto_rate is None or abs(auto_rate - 5.0) > 0.01: + print(f" ERROR: Automatic rate should still be 5.0, got {auto_rate}") + return True + + print(" PASS: Override priority working correctly") + return False + + +def test_finalised_resistance(test_dir): + """Test that finalised rates resist new API data""" + + # Create test subdirectory + os.makedirs(test_dir, exist_ok=True) + + class MockBase: + def __init__(self): + self.plan_interval_minutes = 30 + self.minutes_now = 720 # 12:00 + + def log(self, msg): + print(f" {msg}") + + def get_arg(self, key, default): + if key == "rate_retention_days": + return 7 + return default + + base = MockBase() + store = RateStore(base, save_dir=test_dir) + + today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + minute = 0 # 00:00 + + # Write initial rate + store.write_base_rate(today, minute, 15.0, 5.0) + + # Finalise it (minute 10 is past minute 0 + 5 minute buffer) + store.finalise_slots(today, 10) + + # Try to overwrite with new API data + store.write_base_rate(today, minute, 25.0, 10.0) + + # Should still be 15.0 (finalised rate resists changes) + rate = store.get_rate(today, minute, is_import=True) + if rate is None or abs(rate - 15.0) > 0.01: + print(f" ERROR: Finalised import rate changed from 15.0 to {rate}") + return True + + # Export should also resist + export_rate = store.get_rate(today, minute, is_import=False) + if export_rate is None or abs(export_rate - 5.0) > 0.01: + print(f" ERROR: Finalised export rate changed from 5.0 to {export_rate}") + return True + + print(" PASS: Finalised rates resist new API data") + return False + + +def test_cleanup(test_dir): + """Test cleanup of old rate files""" + + # Create test subdirectory + os.makedirs(test_dir, exist_ok=True) + + class MockBase: + def __init__(self): + self.plan_interval_minutes = 30 + self.minutes_now = 720 # 12:00 + + def log(self, msg): + print(f" {msg}") + + def get_arg(self, key, default): + if key == "rate_retention_days": + return 7 + return default + + base = MockBase() + store = RateStore(base, save_dir=test_dir) + + # Create some old rate files manually + today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + + for days_ago in range(10): + old_date = today - timedelta(days=days_ago) + date_str = old_date.strftime("%Y_%m_%d") + file_path = os.path.join(test_dir, f"rates_{date_str}.json") + + # Write dummy data + with open(file_path, "w") as f: + json.dump({ + "rates_import": {}, + "rates_export": {}, + "last_updated": old_date.isoformat() + }, f) + + # Set file modification time to match the date (so cleanup works correctly) + old_timestamp = (old_date - timedelta(hours=12)).timestamp() # Set to noon of that day + os.utime(file_path, (old_timestamp, old_timestamp)) + + # Run cleanup with 7 days retention + retention_days = 7 + store.cleanup_old_files(retention_days) + + # Check files - should have at most retention_days + 1 (today) files + remaining_files = [f for f in os.listdir(test_dir) if f.startswith("rates_") and f.endswith(".json") and not f.endswith(".bak")] + + # Should have at most 7 days of retention + today = 8 files + if len(remaining_files) > 8: + print(f" ERROR: Expected <= 8 files after cleanup, found {len(remaining_files)}") + print(f" Files: {sorted(remaining_files)}") + return True + + print(" PASS: Cleanup working correctly") + return False diff --git a/apps/predbat/unit_test.py b/apps/predbat/unit_test.py index ec3978a2f..fb80565bf 100644 --- a/apps/predbat/unit_test.py +++ b/apps/predbat/unit_test.py @@ -98,6 +98,7 @@ from tests.test_solis import run_solis_tests from tests.test_load_ml import test_load_ml from tests.test_temperature import test_temperature +from tests.test_rate_store import run_rate_store_tests # Mock the components and plugin system @@ -253,6 +254,8 @@ def main(): ("optimise_levels", run_optimise_levels_tests, "Optimise levels tests", False), ("optimise_windows", run_optimise_all_windows_tests, "Optimise all windows tests", True), ("debug_cases", run_debug_cases, "Debug case file tests", True), + # Rate Store unit tests + ("rate_store", run_rate_store_tests, "Rate Store persistence and finalization tests (write, rehydrate, finalize, priority, cleanup)", False), ] # Parse command line arguments