Skip to content
109 changes: 58 additions & 51 deletions apps/predbat/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class HAHistory(ComponentBase):
def initialize(self):
self.history_entities = {}
self.history_data = {}
self.history_lock = threading.Lock()

def add_entity(self, entity_id, days):
"""
Expand All @@ -80,9 +81,11 @@ def get_history(self, entity_id, days=30, tracked=True):
"""
result = None

if self.history_data.get(entity_id, None) and self.history_entities.get(entity_id, 0) >= days:
result = [self.history_data[entity_id]]
else:
with self.history_lock:
if self.history_data.get(entity_id, None) and self.history_entities.get(entity_id, 0) >= days:
result = [self.history_data[entity_id]]

if result is None:
Comment on lines +84 to +88
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is released before calling add_entity and accessing history_entities (lines 94-95 outside the diff). This creates a potential race condition where another thread could modify history_entities between the lock release and these accesses. Consider expanding the lock scope to include the entire cache-check-and-fetch logic, or add locking to the add_entity method to protect all accesses to history_entities.

Copilot uses AI. Check for mistakes.
ha_interface = self.base.components.get_component("ha")
if not ha_interface:
self.log("Error: HAHistory: No HAInterface available, cannot fetch history")
Expand All @@ -103,23 +106,25 @@ def prune_history(self, now):
"""
Prune history data older than required
"""
for entity_id in list(self.history_data.keys()):
max_days = self.history_entities.get(entity_id, 30)
cutoff_time = now - timedelta(days=max_days)
new_history = []
keep_all = False
for entry in self.history_data[entity_id]:
if keep_all:
new_history.append(entry)
else:
last_updated = entry.get("last_updated", None)
if last_updated:
entry_time = str2time(last_updated)
if entry_time >= cutoff_time:
new_history.append(entry)
# Keep remaining entries now as they are in order
keep_all = True
self.history_data[entity_id] = new_history
with self.history_lock:
for entity_id in list(self.history_data.keys()):
max_days = self.history_entities.get(entity_id, 30)
cutoff_time = now - timedelta(days=max_days)
new_history = []
keep_all = False
for entry in self.history_data[entity_id]:
if keep_all:
new_history.append(entry)
else:
last_updated = entry.get("last_updated", None)
if last_updated:
entry_time = str2time(last_updated)
if entry_time >= cutoff_time:
new_history.append(entry)
# Keep remaining entries now as they are in order
keep_all = True
# Note: Entries without last_updated are dropped
self.history_data[entity_id] = new_history

def update_entity(self, entity_id, new_history_data):
"""
Expand All @@ -139,36 +144,37 @@ def update_entity(self, entity_id, new_history_data):
for entry_attr in FILTER_ENTRIES:
entry.pop(entry_attr, None)

current_history_data = self.history_data.get(entity_id, None)
if current_history_data and len(current_history_data) > 0:
first_updated = current_history_data[0].get("last_updated", None)
last_updated = current_history_data[-1].get("last_updated", None)
else:
first_updated = None
last_updated = None

if last_updated:
# Find the last timestamp in the previous history data, data is always in order from oldest to newest
first_timestamp = str2time(first_updated)
last_timestamp = str2time(last_updated)
# Scan new data, using the timestamp only add new entries
add_all = False
for entry in new_history_data:
if add_all:
self.history_data[entity_id].append(entry)
else:
this_updated = entry.get("last_updated", None)
if this_updated:
entry_time = str2time(this_updated)
if entry_time > last_timestamp:
self.history_data[entity_id].append(entry)
add_all = True # Remaining entries are all newer
elif entry_time < first_timestamp:
self.history_data[entity_id].append(entry)

self.history_data[entity_id].sort(key=lambda x: x.get("last_updated"))
else:
self.history_data[entity_id] = new_history_data
with self.history_lock:
current_history_data = self.history_data.get(entity_id, None)
if current_history_data and len(current_history_data) > 0:
first_updated = current_history_data[0].get("last_updated", None)
last_updated = current_history_data[-1].get("last_updated", None)
else:
first_updated = None
last_updated = None

if last_updated:
# Find the last timestamp in the previous history data, data is always in order from oldest to newest
first_timestamp = str2time(first_updated)
last_timestamp = str2time(last_updated)
# Scan new data, using the timestamp only add new entries
add_all = False
for entry in new_history_data:
if add_all:
self.history_data[entity_id].append(entry)
else:
this_updated = entry.get("last_updated", None)
if this_updated:
entry_time = str2time(this_updated)
if entry_time > last_timestamp:
self.history_data[entity_id].append(entry)
add_all = True # Remaining entries are all newer
elif entry_time < first_timestamp:
self.history_data[entity_id].append(entry)

self.history_data[entity_id].sort(key=lambda x: x.get("last_updated"))
else:
self.history_data[entity_id] = new_history_data

# Update last success timestamp
self.update_success_timestamp()
Expand Down Expand Up @@ -202,8 +208,9 @@ async def run(self, seconds, first):

if first or (seconds % (60 * 60) == 0):
# Prune history data every hour
self.log("Info: HAHistory: Pruning history data")
self.log("Info: HAHistory: Pruning history data started.")
self.prune_history(datetime.now(self.local_tz))
self.log("Info: HAHistory: Pruning history data completed.")
return True


Expand Down
45 changes: 16 additions & 29 deletions apps/predbat/load_ml_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def initialize(self, load_ml_enable, load_ml_source=True, load_ml_max_days_histo
self.ml_epochs_update = 20
self.ml_min_days = 1
self.ml_validation_threshold = 2.0
self.ml_time_decay_days = 7
self.ml_time_decay_days = 30
self.ml_max_load_kw = 50.0
self.ml_max_model_age_hours = 48
self.ml_weight_decay = 0.01
Expand Down Expand Up @@ -409,8 +409,20 @@ async def run(self, seconds, first):
self.api_started = True
return True

# Determine if training is needed
is_initial = not self.initial_training_done

# Fetch fresh load data periodically (every 15 minutes)
should_fetch = first or ((seconds % PREDICTION_INTERVAL_SECONDS) == 0)
should_train = first or ((seconds % RETRAIN_INTERVAL_SECONDS) == 0)

if is_initial:
# Initial run, need to fetch data and train model before we can provide predictions
should_train = True
should_fetch = True
elif should_train:
# Training requires fetching
should_fetch = True

if should_fetch:
async with self.data_lock:
Expand Down Expand Up @@ -442,19 +454,9 @@ async def run(self, seconds, first):
self.log("ML Component: Insufficient data ({:.1f} days, need {})".format(self.load_data_age_days, self.ml_min_days))
return True

# Determine if training is needed
should_train = False
is_initial = False

if not self.initial_training_done:
# First training
should_train = True
is_initial = True
if is_initial:
self.log("ML Component: Starting initial training")
elif seconds % RETRAIN_INTERVAL_SECONDS == 0:
# Periodic fine-tuning every 2 hours
should_train = True
is_initial = False
elif should_train:
self.log("ML Component: Starting fine-tune training (2h interval)")

if should_train:
Expand Down Expand Up @@ -501,7 +503,7 @@ async def _do_training(self, is_initial):
export_rates=self.export_rates_data,
is_initial=is_initial,
epochs=epochs,
time_decay_days=self.ml_time_decay_days,
time_decay_days=min(self.ml_time_decay_days, self.load_data_age_days),
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of max() here may not match the intended behavior. If load_data_age_days exceeds ml_time_decay_days (e.g., 60 days of data vs 30-day decay setting), this will use 60 as the decay timescale, giving the oldest samples (at day 60) a weight of exp(-60/60) ≈ 0.37. If the intention is to decay over 30 days regardless of data age, samples at day 60 should have weight exp(-60/30) ≈ 0.14, requiring time_decay_days to be capped at 30, not increased to 60. Consider whether min(ml_time_decay_days, load_data_age_days) would better match the intended exponential decay behavior, or clarify if the current max() logic is intentional to preserve weight for very old data.

Copilot uses AI. Check for mistakes.
)

if val_mae is not None:
Expand Down Expand Up @@ -618,18 +620,3 @@ def _publish_entity(self):
},
app="load_ml",
)
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed is_alive() method had additional health checking logic that verified the component had been updated within the last 10 minutes. The base class implementation (ComponentBase.is_alive()) only checks if self.api_started is True, which is less strict and won't detect if the component becomes stuck or stops updating.

If the enhanced health checking was intentional and important for monitoring the LoadML component's health, this removal could reduce visibility into component health issues. Consider whether the base class implementation provides sufficient health monitoring or if the 10-minute timeout check should be retained.

Copilot uses AI. Check for mistakes.

def last_updated_time(self):
"""Return last successful update time for component health check."""
return self.last_success_timestamp

def is_alive(self):
"""Check if component is alive and functioning."""
if not self.ml_enable:
return True

if self.last_success_timestamp is None:
return False

age = datetime.now(timezone.utc) - self.last_success_timestamp
return age < timedelta(minutes=10)
53 changes: 48 additions & 5 deletions apps/predbat/load_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
PREDICT_HORIZON = 576 # 48 hours of predictions (576 * 5 min)
HIDDEN_SIZES = [512, 256, 128, 64] # Deeper network with more capacity
BATCH_SIZE = 128 # Smaller batches for better gradient estimates
FINETUNE_HOURS = 24 # Hours of data for fine-tuning
STEP_MINUTES = 5 # Minutes per step

# Feature constants
Expand Down Expand Up @@ -164,6 +163,26 @@ def _initialize_weights(self):
self.adam_t = 0
self.model_initialized = True

def _reset_adam_optimizer(self):
"""
Reset Adam optimizer momentum to zero.

Used when starting fine-tuning to prevent accumulated momentum
from previous training sessions causing overfitting on small
fine-tuning datasets.
"""
if not self.model_initialized:
return

for i in range(len(self.weights)):
self.m_weights[i] = np.zeros_like(self.weights[i])
self.v_weights[i] = np.zeros_like(self.weights[i])
self.m_biases[i] = np.zeros_like(self.biases[i])
self.v_biases[i] = np.zeros_like(self.biases[i])

self.adam_t = 0
self.log("ML Predictor: Reset Adam optimizer state for fine-tuning")

def _forward(self, X):
"""
Forward pass through the network.
Expand Down Expand Up @@ -439,7 +458,7 @@ def _create_dataset(self, load_minutes, now_utc, pv_minutes=None, temp_minutes=N

max_minute = max(energy_per_step.keys())

# Use the entire data set for both training and fine tuning
# Use all data for training
start_minute = 0
end_minute = max_minute

Expand Down Expand Up @@ -758,9 +777,24 @@ def train(self, load_minutes, now_utc, pv_minutes=None, temp_minutes=None, impor
if not self.model_initialized or (is_initial and self.weights is None):
self._initialize_weights()

# Training loop
best_val_loss = float("inf")
# Reset Adam optimizer state for fine-tuning to prevent accumulated
# momentum from causing overfitting on small fine-tuning datasets
if not is_initial and self.model_initialized:
self._reset_adam_optimizer()

# Compute baseline validation before training (shows loaded model performance)
baseline_mae = None
if not is_initial:
baseline_pred, _, _ = self._forward(X_val_norm)
baseline_pred_denorm = self._denormalize_predictions(baseline_pred)
baseline_mae = np.mean(np.abs(y_val - baseline_pred_denorm))
self.log("ML Predictor: Baseline (pre-finetune) val_mae={:.4f} kWh".format(baseline_mae))

# Training loop - use baseline as initial best for fine-tuning
best_val_loss = baseline_mae if baseline_mae is not None else float("inf")
patience_counter = 0
best_weights = None
best_biases = None

for epoch in range(epochs):
# Shuffle training data
Expand Down Expand Up @@ -802,17 +836,26 @@ def train(self, load_minutes, now_utc, pv_minutes=None, temp_minutes=None, impor

self.log("ML Predictor: Epoch {}/{}: train_loss={:.4f} val_mae={:.4f} kWh".format(epoch + 1, epochs, epoch_loss, val_mae))

# Early stopping check
# Early stopping check with weight checkpointing
if val_mae < best_val_loss:
best_val_loss = val_mae
patience_counter = 0
# Checkpoint best weights
best_weights = [w.copy() for w in self.weights]
best_biases = [b.copy() for b in self.biases]
else:
patience_counter += 1

if patience_counter >= patience:
self.log("ML Predictor: Early stopping at epoch {}".format(epoch + 1))
break

# Restore best weights after early stopping
if best_weights is not None and best_biases is not None:
self.weights = best_weights
self.biases = best_biases
self.log("ML Predictor: Restored best weights from epoch with val_mae={:.4f} kWh".format(best_val_loss))

self.training_timestamp = datetime.now(timezone.utc)
self.validation_mae = best_val_loss
self.epochs_trained += epochs
Expand Down
Loading