diff --git a/custom_components/span_panel/__init__.py b/custom_components/span_panel/__init__.py index 459d166..bb61e9d 100644 --- a/custom_components/span_panel/__init__.py +++ b/custom_components/span_panel/__init__.py @@ -21,6 +21,7 @@ # Import config flow to ensure it's registered from . import config_flow # noqa: F401 # type: ignore[misc] from .const import ( + CONF_PANEL_GEN, CONF_SIMULATION_CONFIG, CONF_SIMULATION_OFFLINE_MINUTES, CONF_SIMULATION_START_TIME, @@ -102,10 +103,20 @@ async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> return True +GEN3_PLATFORMS: list[Platform] = [ + Platform.BINARY_SENSOR, + Platform.SENSOR, +] + + async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up Span Panel from a config entry.""" _LOGGER.debug("SETUP ENTRY CALLED! Entry ID: %s, Version: %s", entry.entry_id, entry.version) + # Gen3 gRPC path — completely separate from Gen2 REST + if entry.data.get(CONF_PANEL_GEN) == "gen3": + return await _async_setup_gen3_entry(hass, entry) + # Migration flags will be handled by the coordinator during its update cycle async def ha_compatible_delay(seconds: float) -> None: @@ -329,8 +340,42 @@ async def _test_authenticated_connection() -> None: return True +async def _async_setup_gen3_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Set up a Gen3 panel via gRPC.""" + from .gen3.coordinator import ( # pylint: disable=import-outside-toplevel + SpanGen3Coordinator, # noqa: E402 + ) + + coordinator = SpanGen3Coordinator(hass, entry) + if not await coordinator.async_setup(): + _LOGGER.error("Failed to connect to Gen3 panel at %s", entry.data.get("host")) + return False + + hass.data.setdefault(DOMAIN, {}) + hass.data[DOMAIN][entry.entry_id] = {COORDINATOR: coordinator, NAME: "SpanPanel"} + + await hass.config_entries.async_forward_entry_setups(entry, GEN3_PLATFORMS) + _LOGGER.info("Gen3 panel setup complete for %s", entry.data.get("host")) + return True + + +async def _async_unload_gen3_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: + """Unload a Gen3 panel entry.""" + unload_ok = await hass.config_entries.async_unload_platforms(entry, GEN3_PLATFORMS) + if unload_ok: + coordinator_data = hass.data[DOMAIN].pop(entry.entry_id, {}) + coordinator = coordinator_data.get(COORDINATOR) + if coordinator and hasattr(coordinator, "async_shutdown"): + await coordinator.async_shutdown() + return bool(unload_ok) + + async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload a config entry.""" + # Gen3 unload path + if entry.data.get(CONF_PANEL_GEN) == "gen3": + return await _async_unload_gen3_entry(hass, entry) + _LOGGER.debug("Unloading SPAN Panel integration") # Reset span-panel-api delay function to default diff --git a/custom_components/span_panel/binary_sensor.py b/custom_components/span_panel/binary_sensor.py index a46aaf6..0698fb5 100644 --- a/custom_components/span_panel/binary_sensor.py +++ b/custom_components/span_panel/binary_sensor.py @@ -5,7 +5,7 @@ from collections.abc import Callable from dataclasses import dataclass import logging -from typing import Any, Generic, TypeVar +from typing import Any from homeassistant.components.binary_sensor import ( BinarySensorDeviceClass, @@ -20,6 +20,7 @@ from .const import ( CONF_DEVICE_NAME, + CONF_PANEL_GEN, COORDINATOR, DOMAIN, PANEL_STATUS, @@ -104,11 +105,9 @@ class SpanPanelBinarySensorEntityDescription( ), ) -T = TypeVar("T", bound=SpanPanelBinarySensorEntityDescription) - -class SpanPanelBinarySensor( - CoordinatorEntity[SpanPanelCoordinator], BinarySensorEntity, Generic[T] +class SpanPanelBinarySensor[T: SpanPanelBinarySensorEntityDescription]( + CoordinatorEntity[SpanPanelCoordinator], BinarySensorEntity ): """Binary Sensor status entity.""" @@ -258,6 +257,16 @@ async def async_setup_entry( ) -> None: """Set up status sensor platform.""" + # Gen3 path — use Gen3 binary sensor factory + if config_entry.data.get(CONF_PANEL_GEN) == "gen3": + from .gen3.binary_sensors import ( # pylint: disable=import-outside-toplevel + create_gen3_binary_sensors, # noqa: E402 + ) + + gen3_coordinator = hass.data[DOMAIN][config_entry.entry_id][COORDINATOR] + async_add_entities(create_gen3_binary_sensors(gen3_coordinator)) + return + _LOGGER.debug("ASYNC SETUP ENTRY BINARYSENSOR") data: dict[str, Any] = hass.data[DOMAIN][config_entry.entry_id] diff --git a/custom_components/span_panel/config_flow.py b/custom_components/span_panel/config_flow.py index 4036d09..a1835aa 100644 --- a/custom_components/span_panel/config_flow.py +++ b/custom_components/span_panel/config_flow.py @@ -54,6 +54,7 @@ CONF_API_RETRIES, CONF_API_RETRY_BACKOFF_MULTIPLIER, CONF_API_RETRY_TIMEOUT, + CONF_PANEL_GEN, CONF_SIMULATION_CONFIG, CONF_SIMULATION_OFFLINE_MINUTES, CONF_SIMULATION_START_TIME, @@ -280,8 +281,22 @@ async def async_step_user(self, user_input: dict[str, Any] | None = None) -> Con use_ssl: bool = user_input.get(CONF_USE_SSL, False) - # Validate host before setting up flow + # Validate host before setting up flow (Gen2 REST API) if not await validate_host(self.hass, host, use_ssl=use_ssl): + # REST failed — try Gen3 gRPC as fallback + if await self._test_gen3_connection(host): + _LOGGER.info("Gen3 panel detected at %s (REST unavailable, gRPC OK)", host) + # Gen3 panels don't need auth — create entry directly + await self.async_set_unique_id(host) + self._abort_if_unique_id_configured() + return self.async_create_entry( + title=f"SPAN Panel ({host})", + data={ + CONF_HOST: host, + CONF_PANEL_GEN: "gen3", + }, + ) + return self.async_show_form( step_id="user", data_schema=STEP_USER_DATA_SCHEMA, @@ -298,6 +313,19 @@ async def async_step_user(self, user_input: dict[str, Any] | None = None) -> Con return await self.async_step_choose_auth_type() + async def _test_gen3_connection(self, host: str) -> bool: + """Test if the host is a Gen3 panel via gRPC on port 50065.""" + try: + from .gen3.span_grpc_client import ( # pylint: disable=import-outside-toplevel + SpanGrpcClient, # noqa: E402 + ) + + client = SpanGrpcClient(host) + return await client.test_connection() + except Exception: + _LOGGER.debug("Gen3 gRPC connection test failed for %s", host) + return False + async def _handle_simulator_setup(self, user_input: dict[str, Any]) -> ConfigFlowResult: """Handle simulator mode setup.""" # Precision settings already stored in async_step_user diff --git a/custom_components/span_panel/const.py b/custom_components/span_panel/const.py index 57b5d14..d43aadf 100644 --- a/custom_components/span_panel/const.py +++ b/custom_components/span_panel/const.py @@ -12,6 +12,9 @@ CONF_USE_SSL = "use_ssl" CONF_DEVICE_NAME = "device_name" +# Gen3 gRPC panel generation detection +CONF_PANEL_GEN = "panel_generation" + # Simulation configuration CONF_SIMULATION_CONFIG = "simulation_config" CONF_SIMULATION_START_TIME = "simulation_start_time" diff --git a/custom_components/span_panel/gen3/__init__.py b/custom_components/span_panel/gen3/__init__.py new file mode 100644 index 0000000..1d49d07 --- /dev/null +++ b/custom_components/span_panel/gen3/__init__.py @@ -0,0 +1 @@ +"""Gen3 gRPC support for Span panels (MAIN 40 / MLO 48).""" diff --git a/custom_components/span_panel/gen3/binary_sensors.py b/custom_components/span_panel/gen3/binary_sensors.py new file mode 100644 index 0000000..1f72ba0 --- /dev/null +++ b/custom_components/span_panel/gen3/binary_sensors.py @@ -0,0 +1,80 @@ +"""Binary sensor entities for Gen3 Span panels. + +Provides breaker ON/OFF state detection for each circuit based on +voltage threshold. A breaker is considered ON if its voltage exceeds +5V (5000 mV). +""" + +from __future__ import annotations + +import logging + +from homeassistant.components.binary_sensor import ( + BinarySensorDeviceClass, + BinarySensorEntity, +) +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from custom_components.span_panel.const import DOMAIN + +from .coordinator import SpanGen3Coordinator +from .span_grpc_client import PanelData + +_LOGGER = logging.getLogger(__name__) + + +def create_gen3_binary_sensors( + coordinator: SpanGen3Coordinator, +) -> list[BinarySensorEntity]: + """Create all Gen3 binary sensor entities for the panel.""" + host = coordinator.config_entry.data["host"] + data: PanelData = coordinator.data + entities: list[BinarySensorEntity] = [] + + for circuit_id in data.circuits: + entities.append(SpanGen3BreakerSensor(coordinator, host, circuit_id)) + + return entities + + +class SpanGen3BreakerSensor(CoordinatorEntity[SpanGen3Coordinator], BinarySensorEntity): + """Binary sensor for breaker state (ON/OFF based on voltage).""" + + _attr_has_entity_name = True + _attr_device_class: BinarySensorDeviceClass | None = BinarySensorDeviceClass.POWER + + def __init__( + self, + coordinator: SpanGen3Coordinator, + host: str, + circuit_id: int, + ) -> None: + """Initialize the sensor.""" + super().__init__(coordinator) + self._host = host + self._circuit_id = circuit_id + self._attr_unique_id = f"{host}_gen3_circuit_{circuit_id}_breaker" + self._attr_name = "Breaker" + + @property + def device_info(self) -> DeviceInfo: + """Return device info — circuit sub-device.""" + info = self.coordinator.data.circuits.get(self._circuit_id) + name = info.name if info else f"Circuit {self._circuit_id}" + return DeviceInfo( + identifiers={(DOMAIN, f"{self._host}_circuit_{self._circuit_id}")}, + name=name, + manufacturer="Span", + model="Circuit Breaker", + via_device=(DOMAIN, self._host), + ) + + @property + def is_on(self) -> bool | None: + """Return true if breaker is ON (voltage present).""" + m = self.coordinator.data.metrics.get(self._circuit_id) + if m is None: + return None + is_on: bool = m.is_on + return is_on diff --git a/custom_components/span_panel/gen3/const.py b/custom_components/span_panel/gen3/const.py new file mode 100644 index 0000000..d6d160a --- /dev/null +++ b/custom_components/span_panel/gen3/const.py @@ -0,0 +1,33 @@ +"""Constants for Gen3 Span panel gRPC support.""" + +# Configuration key to distinguish Gen2 (REST) from Gen3 (gRPC) +CONF_PANEL_GEN = "panel_generation" +GEN2 = "gen2" +GEN3 = "gen3" + +# gRPC connection +DEFAULT_GRPC_PORT = 50065 +GRPC_SERVICE_PATH = "/io.span.panel.protocols.traithandler.TraitHandlerService" + +# Trait IDs +TRAIT_BREAKER_GROUPS = 15 +TRAIT_CIRCUIT_NAMES = 16 +TRAIT_BREAKER_CONFIG = 17 +TRAIT_POWER_METRICS = 26 +TRAIT_RELAY_STATE = 27 +TRAIT_BREAKER_PARAMS = 31 + +# Vendor/Product IDs +VENDOR_SPAN = 1 +PRODUCT_GEN3_PANEL = 4 +PRODUCT_GEN3_GATEWAY = 5 + +# Metric IID offset: circuit N -> metric IID = N + 27 +METRIC_IID_OFFSET = 27 + +# Main feed IID (always 1 for trait 26) +MAIN_FEED_IID = 1 + +# Voltage threshold for breaker state detection (millivolts) +# Below this = breaker OFF +BREAKER_OFF_VOLTAGE_MV = 5000 # 5V diff --git a/custom_components/span_panel/gen3/coordinator.py b/custom_components/span_panel/gen3/coordinator.py new file mode 100644 index 0000000..e7d5093 --- /dev/null +++ b/custom_components/span_panel/gen3/coordinator.py @@ -0,0 +1,83 @@ +"""Data coordinator for Gen3 Span panels. + +Wraps the push-based gRPC streaming client in Home Assistant's standard +DataUpdateCoordinator pattern. This gives Gen3 the same +``coordinator.data`` interface that entities expect, while receiving +real-time updates from the gRPC stream rather than polling. +""" + +from __future__ import annotations + +from datetime import timedelta +import logging + +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator + +from .const import DEFAULT_GRPC_PORT +from .span_grpc_client import PanelData, SpanGrpcClient + +_LOGGER = logging.getLogger(__name__) + +# Fallback poll interval — the gRPC stream pushes data, but +# DataUpdateCoordinator requires an interval. Set to a long value +# since real updates come from the stream callback. +_FALLBACK_INTERVAL = timedelta(seconds=300) + + +class SpanGen3Coordinator(DataUpdateCoordinator[PanelData]): + """Coordinator for Gen3 Span panels using gRPC streaming.""" + + def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: + """Initialize the coordinator.""" + super().__init__( + hass, + _LOGGER, + name=f"Span Gen3 ({entry.data.get('host', 'unknown')})", + update_interval=_FALLBACK_INTERVAL, + ) + self.config_entry = entry + self._client = SpanGrpcClient( + host=entry.data["host"], + port=entry.data.get("port", DEFAULT_GRPC_PORT), + ) + + @property + def client(self) -> SpanGrpcClient: + """Return the gRPC client.""" + return self._client + + async def async_setup(self) -> bool: + """Connect to the panel and start streaming.""" + if not await self._client.connect(): + return False + + # Wire up the gRPC stream callback to DataUpdateCoordinator + self._client.register_callback(self._on_data_update) + + # Seed the coordinator with initial data + self.async_set_updated_data(self._client.data) + + # Start the metric stream + await self._client.start_streaming() + return True + + async def async_shutdown(self) -> None: + """Stop streaming and disconnect.""" + await self._client.stop_streaming() + await self._client.disconnect() + + @callback + def _on_data_update(self) -> None: + """Handle data update from gRPC stream. + + Called by the gRPC client whenever new metrics arrive. Pushes + the latest PanelData into the DataUpdateCoordinator, which + triggers entity state writes. + """ + self.async_set_updated_data(self._client.data) + + async def _async_update_data(self) -> PanelData: + """Fallback for manual refresh — return cached data from stream.""" + return self._client.data diff --git a/custom_components/span_panel/gen3/sensors.py b/custom_components/span_panel/gen3/sensors.py new file mode 100644 index 0000000..bdfaac0 --- /dev/null +++ b/custom_components/span_panel/gen3/sensors.py @@ -0,0 +1,304 @@ +"""Sensor entities for Gen3 Span panels. + +Creates power, voltage, current, and frequency sensors for both +the main feed and individual circuits. Uses CoordinatorEntity + +SensorEntity following standard HA patterns. +""" + +from __future__ import annotations + +import logging + +from homeassistant.components.sensor import ( + SensorDeviceClass, + SensorEntity, + SensorStateClass, +) +from homeassistant.const import ( + UnitOfElectricCurrent, + UnitOfElectricPotential, + UnitOfFrequency, + UnitOfPower, +) +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.update_coordinator import CoordinatorEntity + +from custom_components.span_panel.const import DOMAIN + +from .coordinator import SpanGen3Coordinator +from .span_grpc_client import PanelData + +_LOGGER = logging.getLogger(__name__) + + +def create_gen3_sensors( + coordinator: SpanGen3Coordinator, +) -> list[SensorEntity]: + """Create all Gen3 sensor entities for the panel. + + Returns a flat list of main feed sensors + per-circuit sensors. + """ + host = coordinator.config_entry.data["host"] + entities: list[SensorEntity] = [] + + # Main feed sensors + entities.extend( + [ + SpanGen3MainPowerSensor(coordinator, host), + SpanGen3MainVoltageSensor(coordinator, host), + SpanGen3MainCurrentSensor(coordinator, host), + SpanGen3MainFrequencySensor(coordinator, host), + ] + ) + + # Per-circuit sensors + data: PanelData = coordinator.data + for circuit_id in data.circuits: + entities.extend( + [ + SpanGen3CircuitPowerSensor(coordinator, host, circuit_id), + SpanGen3CircuitVoltageSensor(coordinator, host, circuit_id), + SpanGen3CircuitCurrentSensor(coordinator, host, circuit_id), + SpanGen3CircuitPositionSensor(coordinator, host, circuit_id), + ] + ) + + return entities + + +# --------------------------------------------------------------------------- +# Base classes +# --------------------------------------------------------------------------- + + +class SpanGen3SensorBase(CoordinatorEntity[SpanGen3Coordinator], SensorEntity): + """Base class for Gen3 sensors.""" + + _attr_has_entity_name = True + _attr_state_class: str | None = SensorStateClass.MEASUREMENT + + def __init__(self, coordinator: SpanGen3Coordinator, host: str) -> None: + """Initialize the sensor.""" + super().__init__(coordinator) + self._host = host + + @property + def device_info(self) -> DeviceInfo: + """Return device info for the main panel device.""" + return DeviceInfo( + identifiers={(DOMAIN, self._host)}, + name="SPAN Panel", + manufacturer="Span", + model="Gen3", + ) + + +class SpanGen3CircuitSensorBase(SpanGen3SensorBase): + """Base class for per-circuit Gen3 sensors.""" + + def __init__( + self, + coordinator: SpanGen3Coordinator, + host: str, + circuit_id: int, + ) -> None: + """Initialize the circuit sensor.""" + super().__init__(coordinator, host) + self._circuit_id = circuit_id + + @property + def _circuit_info(self): + """Return circuit info.""" + return self.coordinator.data.circuits.get(self._circuit_id) + + @property + def _circuit_metrics(self): + """Return circuit metrics.""" + return self.coordinator.data.metrics.get(self._circuit_id) + + @property + def device_info(self) -> DeviceInfo: + """Return device info — circuit as sub-device of panel.""" + info = self._circuit_info + name = info.name if info else f"Circuit {self._circuit_id}" + return DeviceInfo( + identifiers={(DOMAIN, f"{self._host}_circuit_{self._circuit_id}")}, + name=name, + manufacturer="Span", + model="Circuit Breaker", + via_device=(DOMAIN, self._host), + ) + + +# --------------------------------------------------------------------------- +# Main feed sensors +# --------------------------------------------------------------------------- + + +class SpanGen3MainPowerSensor(SpanGen3SensorBase): + """Main feed power sensor.""" + + _attr_device_class: SensorDeviceClass | None = SensorDeviceClass.POWER # type: ignore[mutable-override] + _attr_native_unit_of_measurement: str | None = UnitOfPower.WATT + _attr_suggested_display_precision: int | None = 0 + + def __init__(self, coordinator: SpanGen3Coordinator, host: str) -> None: + """Initialize the main feed power sensor.""" + super().__init__(coordinator, host) + self._attr_unique_id = f"{host}_gen3_main_power" + self._attr_name = "Main Feed Power" + + @property + def native_value(self) -> float | None: + """Return the current power reading.""" + m = self.coordinator.data.main_feed + return round(m.power_w, 1) if m else None + + +class SpanGen3MainVoltageSensor(SpanGen3SensorBase): + """Main feed voltage sensor.""" + + _attr_device_class: SensorDeviceClass | None = SensorDeviceClass.VOLTAGE # type: ignore[mutable-override] + _attr_native_unit_of_measurement: str | None = UnitOfElectricPotential.VOLT + _attr_suggested_display_precision: int | None = 1 + + def __init__(self, coordinator: SpanGen3Coordinator, host: str) -> None: + """Initialize the main feed voltage sensor.""" + super().__init__(coordinator, host) + self._attr_unique_id = f"{host}_gen3_main_voltage" + self._attr_name = "Main Feed Voltage" + + @property + def native_value(self) -> float | None: + """Return the current voltage reading.""" + m = self.coordinator.data.main_feed + return round(m.voltage_v, 1) if m else None + + +class SpanGen3MainCurrentSensor(SpanGen3SensorBase): + """Main feed current sensor.""" + + _attr_device_class: SensorDeviceClass | None = SensorDeviceClass.CURRENT # type: ignore[mutable-override] + _attr_native_unit_of_measurement: str | None = UnitOfElectricCurrent.AMPERE + _attr_suggested_display_precision: int | None = 1 + + def __init__(self, coordinator: SpanGen3Coordinator, host: str) -> None: + """Initialize the main feed current sensor.""" + super().__init__(coordinator, host) + self._attr_unique_id = f"{host}_gen3_main_current" + self._attr_name = "Main Feed Current" + + @property + def native_value(self) -> float | None: + """Return the current amperage reading.""" + m = self.coordinator.data.main_feed + return round(m.current_a, 1) if m else None + + +class SpanGen3MainFrequencySensor(SpanGen3SensorBase): + """Main feed frequency sensor.""" + + _attr_device_class: SensorDeviceClass | None = SensorDeviceClass.FREQUENCY # type: ignore[mutable-override] + _attr_native_unit_of_measurement: str | None = UnitOfFrequency.HERTZ + _attr_suggested_display_precision: int | None = 2 + + def __init__(self, coordinator: SpanGen3Coordinator, host: str) -> None: + """Initialize the main feed frequency sensor.""" + super().__init__(coordinator, host) + self._attr_unique_id = f"{host}_gen3_main_frequency" + self._attr_name = "Main Feed Frequency" + + @property + def native_value(self) -> float | None: + """Return the current frequency reading.""" + m = self.coordinator.data.main_feed + return round(m.frequency_hz, 2) if m and m.frequency_hz > 0 else None + + +# --------------------------------------------------------------------------- +# Per-circuit sensors +# --------------------------------------------------------------------------- + + +class SpanGen3CircuitPowerSensor(SpanGen3CircuitSensorBase): + """Per-circuit power sensor.""" + + _attr_device_class: SensorDeviceClass | None = SensorDeviceClass.POWER # type: ignore[mutable-override] + _attr_native_unit_of_measurement: str | None = UnitOfPower.WATT + _attr_suggested_display_precision: int | None = 0 + + def __init__(self, coordinator: SpanGen3Coordinator, host: str, circuit_id: int) -> None: + """Initialize the circuit power sensor.""" + super().__init__(coordinator, host, circuit_id) + self._attr_unique_id = f"{host}_gen3_circuit_{circuit_id}_power" + self._attr_name = "Power" + + @property + def native_value(self) -> float | None: + """Return the current power reading.""" + m = self._circuit_metrics + return round(m.power_w, 1) if m else None + + +class SpanGen3CircuitVoltageSensor(SpanGen3CircuitSensorBase): + """Per-circuit voltage sensor.""" + + _attr_device_class: SensorDeviceClass | None = SensorDeviceClass.VOLTAGE # type: ignore[mutable-override] + _attr_native_unit_of_measurement: str | None = UnitOfElectricPotential.VOLT + _attr_suggested_display_precision: int | None = 1 + + def __init__(self, coordinator: SpanGen3Coordinator, host: str, circuit_id: int) -> None: + """Initialize the circuit voltage sensor.""" + super().__init__(coordinator, host, circuit_id) + self._attr_unique_id = f"{host}_gen3_circuit_{circuit_id}_voltage" + self._attr_name = "Voltage" + + @property + def native_value(self) -> float | None: + """Return the current voltage reading.""" + m = self._circuit_metrics + return round(m.voltage_v, 1) if m else None + + +class SpanGen3CircuitCurrentSensor(SpanGen3CircuitSensorBase): + """Per-circuit current sensor.""" + + _attr_device_class: SensorDeviceClass | None = SensorDeviceClass.CURRENT # type: ignore[mutable-override] + _attr_native_unit_of_measurement: str | None = UnitOfElectricCurrent.AMPERE + _attr_suggested_display_precision: int | None = 2 + + def __init__(self, coordinator: SpanGen3Coordinator, host: str, circuit_id: int) -> None: + """Initialize the circuit current sensor.""" + super().__init__(coordinator, host, circuit_id) + self._attr_unique_id = f"{host}_gen3_circuit_{circuit_id}_current" + self._attr_name = "Current" + + @property + def native_value(self) -> float | None: + """Return the current amperage reading.""" + m = self._circuit_metrics + return round(m.current_a, 3) if m else None + + +class SpanGen3CircuitPositionSensor(SpanGen3CircuitSensorBase): + """Per-circuit panel position (breaker slot number) sensor.""" + + _attr_icon: str | None = "mdi:electric-switch" + _attr_state_class: str | None = ( + None # Static configuration value, not a time-series measurement + ) + + def __init__(self, coordinator: SpanGen3Coordinator, host: str, circuit_id: int) -> None: + """Initialize the circuit position sensor.""" + super().__init__(coordinator, host, circuit_id) + self._attr_unique_id = f"{host}_gen3_circuit_{circuit_id}_position" + self._attr_name = "Panel Position" + + @property + def native_value(self) -> int | None: + """Return the breaker slot number.""" + info = self._circuit_info + if info is None: + return None + pos = info.breaker_position + return pos if pos > 0 else None diff --git a/custom_components/span_panel/gen3/span.protoset b/custom_components/span_panel/gen3/span.protoset new file mode 100644 index 0000000..d3c3ad2 Binary files /dev/null and b/custom_components/span_panel/gen3/span.protoset differ diff --git a/custom_components/span_panel/gen3/span_grpc_client.py b/custom_components/span_panel/gen3/span_grpc_client.py new file mode 100644 index 0000000..6dc87a5 --- /dev/null +++ b/custom_components/span_panel/gen3/span_grpc_client.py @@ -0,0 +1,984 @@ +"""gRPC client for Gen3 Span panels (MAIN 40 / MLO 48). + +This module provides local gRPC communication with Gen3 Span smart electrical +panels. Gen3 panels replaced the REST API with gRPC on port 50065. No +authentication is required for local connections. + +The client uses manual protobuf encoding/decoding to avoid requiring generated +stubs, keeping the dependency footprint minimal (only grpcio is needed). +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable +import contextlib +from dataclasses import dataclass, field +import logging +import struct +from typing import Any + +import grpc + +from .const import ( + BREAKER_OFF_VOLTAGE_MV, + DEFAULT_GRPC_PORT, + MAIN_FEED_IID, + PRODUCT_GEN3_PANEL, + TRAIT_BREAKER_GROUPS, + TRAIT_CIRCUIT_NAMES, + TRAIT_POWER_METRICS, + VENDOR_SPAN, +) + +_LOGGER = logging.getLogger(__name__) + +# gRPC method paths +_SVC = "/io.span.panel.protocols.traithandler.TraitHandlerService" +_GET_INSTANCES = f"{_SVC}/GetInstances" +_SUBSCRIBE = f"{_SVC}/Subscribe" +_GET_REVISION = f"{_SVC}/GetRevision" + + +@dataclass +class CircuitInfo: + """Information about a circuit discovered from trait instances.""" + + circuit_id: int + name: str + metric_iid: int + name_iid: int = 0 + is_dual_phase: bool = False + breaker_position: int = 0 + + +@dataclass +class CircuitMetrics: + """Real-time metrics for a circuit from the gRPC stream.""" + + power_w: float = 0.0 + voltage_v: float = 0.0 + current_a: float = 0.0 + apparent_power_va: float = 0.0 + reactive_power_var: float = 0.0 + frequency_hz: float = 0.0 + power_factor: float = 0.0 + is_on: bool = True + # Dual-phase legs + voltage_a_v: float = 0.0 + voltage_b_v: float = 0.0 + current_a_a: float = 0.0 + current_b_a: float = 0.0 + + +@dataclass +class PanelData: + """Aggregated panel data from gRPC discovery and streaming.""" + + serial: str = "" + firmware: str = "" + panel_resource_id: str = "" + circuits: dict[int, CircuitInfo] = field(default_factory=dict) + metrics: dict[int, CircuitMetrics] = field(default_factory=dict) + main_feed: CircuitMetrics = field(default_factory=CircuitMetrics) + # Reverse lookup: metric IID → circuit_id (built during discovery) + metric_iid_to_circuit: dict[int, int] = field(default_factory=dict) + # BreakerGroups trait 15 instance IIDs — populated in _parse_instances + breaker_group_iids: list[int] = field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Protobuf helpers — manual varint/field parsing +# --------------------------------------------------------------------------- + + +def _decode_varint(data: bytes, offset: int) -> tuple[int, int]: + """Decode a protobuf varint, return (value, new_offset).""" + result = 0 + shift = 0 + while offset < len(data): + b = data[offset] + offset += 1 + result |= (b & 0x7F) << shift + if not (b & 0x80): + break + shift += 7 + return result, offset + + +def _parse_protobuf_fields(data: bytes) -> dict[int, list[Any]]: + """Parse raw protobuf bytes into a dict of field_number -> [values].""" + fields: dict[int, list[Any]] = {} + offset = 0 + while offset < len(data): + tag, offset = _decode_varint(data, offset) + field_num = tag >> 3 + wire_type = tag & 0x07 + + value: int | bytes + if wire_type == 0: # varint + value, offset = _decode_varint(data, offset) + elif wire_type == 1: # 64-bit + if offset + 8 > len(data): + break + value = struct.unpack_from(" len(data): + break + value = data[offset : offset + length] + offset += length + elif wire_type == 5: # 32-bit + if offset + 4 > len(data): + break + value = struct.unpack_from(" Any: + """Get first value for a field number.""" + vals = fields.get(num) + return vals[0] if vals else default + + +def _parse_min_max_avg(data: bytes) -> dict[str, float]: + """Parse a min/max/avg sub-message (fields 1/2/3).""" + fields = _parse_protobuf_fields(data) + return { + "min": _get_field(fields, 1, 0), + "max": _get_field(fields, 2, 0), + "avg": _get_field(fields, 3, 0), + } + + +# --------------------------------------------------------------------------- +# Metric decoders — single-phase, dual-phase, and main feed +# --------------------------------------------------------------------------- + + +def _decode_single_phase(data: bytes) -> CircuitMetrics: + """Decode single-phase (120V) metrics from protobuf field 11.""" + fields = _parse_protobuf_fields(data) + metrics = CircuitMetrics() + + current_data = _get_field(fields, 1) + if current_data and isinstance(current_data, bytes): + current = _parse_min_max_avg(current_data) + metrics.current_a = current["avg"] / 1000.0 + + voltage_data = _get_field(fields, 2) + if voltage_data and isinstance(voltage_data, bytes): + voltage = _parse_min_max_avg(voltage_data) + metrics.voltage_v = voltage["avg"] / 1000.0 + + power_data = _get_field(fields, 3) + if power_data and isinstance(power_data, bytes): + power = _parse_min_max_avg(power_data) + metrics.power_w = power["avg"] / 2000.0 + + apparent_data = _get_field(fields, 4) + if apparent_data and isinstance(apparent_data, bytes): + apparent = _parse_min_max_avg(apparent_data) + metrics.apparent_power_va = apparent["avg"] / 2000.0 + + reactive_data = _get_field(fields, 5) + if reactive_data and isinstance(reactive_data, bytes): + reactive = _parse_min_max_avg(reactive_data) + metrics.reactive_power_var = reactive["avg"] / 2000.0 + + metrics.is_on = (metrics.voltage_v * 1000) > BREAKER_OFF_VOLTAGE_MV + return metrics + + +def _decode_dual_phase(data: bytes) -> CircuitMetrics: + """Decode dual-phase (240V) metrics from protobuf field 12.""" + fields = _parse_protobuf_fields(data) + metrics = CircuitMetrics() + + # Leg A (field 1) + leg_a_data = _get_field(fields, 1) + if leg_a_data and isinstance(leg_a_data, bytes): + leg_a = _parse_protobuf_fields(leg_a_data) + current_data = _get_field(leg_a, 1) + if current_data and isinstance(current_data, bytes): + metrics.current_a_a = _parse_min_max_avg(current_data)["avg"] / 1000.0 + voltage_data = _get_field(leg_a, 2) + if voltage_data and isinstance(voltage_data, bytes): + metrics.voltage_a_v = _parse_min_max_avg(voltage_data)["avg"] / 1000.0 + + # Leg B (field 2) + leg_b_data = _get_field(fields, 2) + if leg_b_data and isinstance(leg_b_data, bytes): + leg_b = _parse_protobuf_fields(leg_b_data) + current_data = _get_field(leg_b, 1) + if current_data and isinstance(current_data, bytes): + metrics.current_b_a = _parse_min_max_avg(current_data)["avg"] / 1000.0 + voltage_data = _get_field(leg_b, 2) + if voltage_data and isinstance(voltage_data, bytes): + metrics.voltage_b_v = _parse_min_max_avg(voltage_data)["avg"] / 1000.0 + + # Combined (field 3) + combined_data = _get_field(fields, 3) + if combined_data and isinstance(combined_data, bytes): + combined = _parse_protobuf_fields(combined_data) + voltage_data = _get_field(combined, 2) + if voltage_data and isinstance(voltage_data, bytes): + metrics.voltage_v = _parse_min_max_avg(voltage_data)["avg"] / 1000.0 + power_data = _get_field(combined, 3) + if power_data and isinstance(power_data, bytes): + metrics.power_w = _parse_min_max_avg(power_data)["avg"] / 2000.0 + apparent_data = _get_field(combined, 4) + if apparent_data and isinstance(apparent_data, bytes): + metrics.apparent_power_va = _parse_min_max_avg(apparent_data)["avg"] / 2000.0 + reactive_data = _get_field(combined, 5) + if reactive_data and isinstance(reactive_data, bytes): + metrics.reactive_power_var = _parse_min_max_avg(reactive_data)["avg"] / 2000.0 + pf_data = _get_field(combined, 6) + if pf_data and isinstance(pf_data, bytes): + pf = _parse_min_max_avg(pf_data) + metrics.power_factor = pf["avg"] / 2000.0 + + # Frequency (field 4) + freq_data = _get_field(fields, 4) + if freq_data and isinstance(freq_data, bytes): + freq = _parse_min_max_avg(freq_data) + metrics.frequency_hz = freq["avg"] / 1000.0 + + # Total current = leg A + leg B + metrics.current_a = metrics.current_a_a + metrics.current_b_a + + metrics.is_on = (metrics.voltage_v * 1000) > BREAKER_OFF_VOLTAGE_MV + return metrics + + +def _extract_deepest_value(data: bytes, target_field: int = 3) -> int: + """Extract the deepest varint from nested protobuf. + + Recursively searches for the largest non-zero value at the target field + within nested sub-messages. + """ + fields = _parse_protobuf_fields(data) + best = 0 + + for fn, vals in fields.items(): + for v in vals: + if isinstance(v, bytes) and len(v) > 0: + inner = _extract_deepest_value(v, target_field) + if inner > best: + best = inner + elif not isinstance(v, bytes) and fn == target_field: + if v > best: + best = v + return best + + +def _decode_main_feed(data: bytes) -> CircuitMetrics: + """Decode main feed metrics from protobuf field 14. + + Field 14 has deeper nesting than circuit fields 11/12. The structure: + 14.1 = primary data block (leg A) + 14.2 = secondary data block (leg B) + Each leg: {1: current stats, 2: voltage stats, 3: power stats, 4: frequency} + """ + fields = _parse_protobuf_fields(data) + main_data = _get_field(fields, 14) + if not main_data or not isinstance(main_data, bytes): + return CircuitMetrics() + + metrics = CircuitMetrics() + main_fields = _parse_protobuf_fields(main_data) + + # Extract from primary data block (field 1 = leg A) + leg_a = _get_field(main_fields, 1) + if leg_a and isinstance(leg_a, bytes): + la_fields = _parse_protobuf_fields(leg_a) + + power_stats = _get_field(la_fields, 3) + if power_stats and isinstance(power_stats, bytes): + metrics.power_w = _extract_deepest_value(power_stats) / 2000.0 + + voltage_stats = _get_field(la_fields, 2) + if voltage_stats and isinstance(voltage_stats, bytes): + vs_fields = _parse_protobuf_fields(voltage_stats) + f2 = _get_field(vs_fields, 2) + if f2 and isinstance(f2, bytes): + inner = _parse_protobuf_fields(f2) + v = _get_field(inner, 3, 0) + if isinstance(v, int) and v > 0: + metrics.voltage_a_v = v / 1000.0 + + freq_stats = _get_field(la_fields, 4) + if freq_stats and isinstance(freq_stats, bytes): + freq_fields = _parse_protobuf_fields(freq_stats) + freq_val = _get_field(freq_fields, 3, 0) + if isinstance(freq_val, int) and freq_val > 0: + metrics.frequency_hz = freq_val / 1000.0 + + # Leg B data (field 2) + leg_b = _get_field(main_fields, 2) + if leg_b and isinstance(leg_b, bytes): + lb_fields = _parse_protobuf_fields(leg_b) + power_stats = _get_field(lb_fields, 3) + if power_stats and isinstance(power_stats, bytes): + lb_power = _extract_deepest_value(power_stats) / 2000.0 + if lb_power > 0: + metrics.power_w += lb_power + voltage_stats = _get_field(lb_fields, 2) + if voltage_stats and isinstance(voltage_stats, bytes): + vs_fields = _parse_protobuf_fields(voltage_stats) + f2 = _get_field(vs_fields, 2) + if f2 and isinstance(f2, bytes): + inner = _parse_protobuf_fields(f2) + v = _get_field(inner, 3, 0) + if isinstance(v, int) and v > 0: + metrics.voltage_b_v = v / 1000.0 + + # Combined voltage (split-phase: leg A + leg B, or 2x leg A) + if metrics.voltage_b_v > 0: + metrics.voltage_v = metrics.voltage_a_v + metrics.voltage_b_v + else: + metrics.voltage_v = metrics.voltage_a_v * 2 # Assume symmetric + + # Derive current from power and voltage + if metrics.voltage_v > 0: + metrics.current_a = metrics.power_w / metrics.voltage_v + + metrics.is_on = True + return metrics + + +# --------------------------------------------------------------------------- +# Protobuf encoding helpers +# --------------------------------------------------------------------------- + + +def _encode_varint(value: int) -> bytes: + """Encode an integer as a protobuf varint.""" + parts = [] + while value > 0x7F: + parts.append((value & 0x7F) | 0x80) + value >>= 7 + parts.append(value & 0x7F) + return bytes(parts) if parts else b"\x00" + + +def _encode_varint_field(field_num: int, value: int) -> bytes: + """Encode a varint field (tag + value).""" + tag = (field_num << 3) | 0 # wire type 0 = varint + return _encode_varint(tag) + _encode_varint(value) + + +def _encode_bytes_field(field_num: int, value: bytes) -> bytes: + """Encode a length-delimited field (tag + length + value).""" + tag = (field_num << 3) | 2 # wire type 2 = length-delimited + return _encode_varint(tag) + _encode_varint(len(value)) + value + + +def _encode_string_field(field_num: int, value: str) -> bytes: + """Encode a string field (tag + length + utf-8 bytes).""" + return _encode_bytes_field(field_num, value.encode("utf-8")) + + +# --------------------------------------------------------------------------- +# gRPC Client +# --------------------------------------------------------------------------- + + +class SpanGrpcClient: + """gRPC client for Gen3 Span panels. + + Connects to the panel's TraitHandlerService on port 50065 (no auth). + Discovers circuits via GetInstances, fetches names via GetRevision, + and streams real-time power metrics via Subscribe. + """ + + def __init__(self, host: str, port: int = DEFAULT_GRPC_PORT) -> None: + """Initialize the client.""" + self._host = host + self._port = port + self._channel: grpc.aio.Channel | None = None + self._stream_task: asyncio.Task | None = None + self._data = PanelData() + self._callbacks: list[Callable[[], None]] = [] + self._connected = False + + @property + def data(self) -> PanelData: + """Return current panel data.""" + return self._data + + @property + def connected(self) -> bool: + """Return connection status.""" + return self._connected + + def register_callback(self, callback: Callable[[], None]) -> Callable[[], None]: + """Register a callback for data updates. Returns unregister function.""" + self._callbacks.append(callback) + return lambda: self._callbacks.remove(callback) + + def _notify(self) -> None: + """Notify all registered callbacks.""" + for cb in self._callbacks: + try: + cb() + except Exception: + _LOGGER.exception("Error in callback") + + async def connect(self) -> bool: + """Connect to the panel and fetch initial data.""" + try: + self._channel = grpc.aio.insecure_channel( + f"{self._host}:{self._port}", + options=[ + ("grpc.keepalive_time_ms", 30000), + ("grpc.keepalive_timeout_ms", 10000), + ("grpc.keepalive_permit_without_calls", True), + ], + ) + await self._fetch_instances() + await self._fetch_breaker_groups() + await self._fetch_circuit_names() + self._connected = True + with_pos = sum(1 for c in self._data.circuits.values() if c.breaker_position > 0) + _LOGGER.info( + "Connected to Gen3 panel at %s:%s — %d circuits discovered, " + "%d with breaker positions", + self._host, + self._port, + len(self._data.circuits), + with_pos, + ) + return True + except Exception: + _LOGGER.exception("Failed to connect to Gen3 panel at %s:%s", self._host, self._port) + self._connected = False + return False + + async def disconnect(self) -> None: + """Disconnect from the panel.""" + self._connected = False + if self._stream_task and not self._stream_task.done(): + self._stream_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._stream_task + if self._channel: + await self._channel.close() + self._channel = None + + async def start_streaming(self) -> None: + """Start the metric streaming task.""" + if self._stream_task and not self._stream_task.done(): + return + self._stream_task = asyncio.create_task(self._stream_loop()) + + async def stop_streaming(self) -> None: + """Stop the metric streaming task.""" + if self._stream_task and not self._stream_task.done(): + self._stream_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._stream_task + + async def test_connection(self) -> bool: + """Test if we can connect to the panel (static method-like).""" + try: + channel = grpc.aio.insecure_channel( + f"{self._host}:{self._port}", + options=[("grpc.initial_reconnect_backoff_ms", 1000)], + ) + try: + response = await asyncio.wait_for( + channel.unary_unary( + _GET_INSTANCES, + request_serializer=lambda x: x, + response_deserializer=lambda x: x, + )(b""), + timeout=5.0, + ) + return len(response) > 0 + finally: + await channel.close() + except Exception: + return False + + # ------------------------------------------------------------------ + # Instance discovery + # ------------------------------------------------------------------ + + async def _fetch_instances(self) -> None: + """Fetch all trait instances to discover circuits.""" + assert self._channel is not None + response = await self._channel.unary_unary( + _GET_INSTANCES, + request_serializer=lambda x: x, + response_deserializer=lambda x: x, + )(b"") + self._parse_instances(response) + + def _parse_instances(self, data: bytes) -> None: + """Parse GetInstancesResponse to discover circuits and panel info. + + Collects trait 26 (PowerMetrics) instance IDs as circuit entries, + and trait 15 (BreakerGroups) IIDs for later BreakerGroups resolution. + Circuit names and breaker positions are resolved in subsequent steps + (_fetch_breaker_groups, _fetch_circuit_names) rather than here. + """ + fields = _parse_protobuf_fields(data) + items = fields.get(1, []) + + metric_iids: list[int] = [] # Trait 26 instance IDs (excl main feed) + + for item_data in items: + if not isinstance(item_data, bytes): + continue + item_fields = _parse_protobuf_fields(item_data) + + trait_info_data = _get_field(item_fields, 1) + if not trait_info_data or not isinstance(trait_info_data, bytes): + continue + + trait_info_fields = _parse_protobuf_fields(trait_info_data) + + external_data = _get_field(trait_info_fields, 2) + if not external_data or not isinstance(external_data, bytes): + continue + + ext_fields = _parse_protobuf_fields(external_data) + + # resource_id (field 1) + resource_data = _get_field(ext_fields, 1) + resource_id_str = "" + if resource_data and isinstance(resource_data, bytes): + rid_fields = _parse_protobuf_fields(resource_data) + rid_val = _get_field(rid_fields, 1) + if rid_val and isinstance(rid_val, bytes): + resource_id_str = rid_val.decode("utf-8", errors="replace") + + # trait_info (field 2) + inner_info = _get_field(ext_fields, 2) + if not inner_info or not isinstance(inner_info, bytes): + continue + + inner_fields = _parse_protobuf_fields(inner_info) + + meta_data = _get_field(inner_fields, 1) + if not meta_data or not isinstance(meta_data, bytes): + continue + + meta_fields = _parse_protobuf_fields(meta_data) + vendor_id = _get_field(meta_fields, 1, 0) + product_id = _get_field(meta_fields, 2, 0) + trait_id = _get_field(meta_fields, 3, 0) + + instance_data = _get_field(inner_fields, 2) + instance_id = 0 + if instance_data and isinstance(instance_data, bytes): + iid_fields = _parse_protobuf_fields(instance_data) + instance_id = _get_field(iid_fields, 1, 0) + + # Capture panel resource_id + if ( + product_id == PRODUCT_GEN3_PANEL + and resource_id_str + and not self._data.panel_resource_id + ): + self._data.panel_resource_id = resource_id_str + + if vendor_id != VENDOR_SPAN: + continue + + # Trait 15 (BreakerGroups): collect IIDs for Step 2 + if trait_id == TRAIT_BREAKER_GROUPS: + if instance_id not in self._data.breaker_group_iids: + self._data.breaker_group_iids.append(instance_id) + + # Trait 26 (PowerMetrics): each non-main-feed instance is a circuit + if trait_id == TRAIT_POWER_METRICS and instance_id != MAIN_FEED_IID: + metric_iids.append(instance_id) + self._data.circuits[instance_id] = CircuitInfo( + circuit_id=instance_id, + name=f"Circuit {instance_id}", + metric_iid=instance_id, + ) + self._data.metric_iid_to_circuit[instance_id] = instance_id + + _LOGGER.debug( + "Discovered %d metric instances (trait 26, excl main feed) and " + "%d breaker group instances (trait 15). Metric IIDs: %s, " + "BreakerGroup IIDs: %s", + len(metric_iids), + len(self._data.breaker_group_iids), + sorted(metric_iids)[:5], + sorted(self._data.breaker_group_iids)[:5], + ) + + # ------------------------------------------------------------------ + # Breaker group resolution + # ------------------------------------------------------------------ + + async def _fetch_breaker_groups(self) -> None: + """Fetch BreakerGroup mappings (trait 15) to get name_iid and breaker_position. + + For each BreakerGroup IID that matches a PowerMetrics circuit, calls + GetRevision(trait 15) to extract: + - name_iid: the trait 16 IID used to fetch the human-readable circuit name + - breaker_position: the physical slot number (1-48) in the panel + + Circuits with no BreakerGroup mapping (orphans) are removed — they are + system/ghost metrics with no physical breaker. + """ + assert self._channel is not None + for group_iid in self._data.breaker_group_iids: + if group_iid not in self._data.circuits: + # BreakerGroup IID without a matching PowerMetrics instance — skip + continue + request = self._build_get_revision_request( + vendor_id=VENDOR_SPAN, + product_id=PRODUCT_GEN3_PANEL, + trait_id=TRAIT_BREAKER_GROUPS, + instance_id=group_iid, + ) + try: + response = await self._channel.unary_unary( + _GET_REVISION, + request_serializer=lambda x: x, + response_deserializer=lambda x: x, + )(request) + name_id, breaker_pos, is_dual = self._parse_breaker_group(response) + circuit = self._data.circuits[group_iid] + circuit.name_iid = name_id + circuit.is_dual_phase = is_dual + if breaker_pos > 0: + circuit.breaker_position = breaker_pos + _LOGGER.debug( + "BreakerGroup IID %d → name_iid=%d, breaker_position=%d, dual=%s", + group_iid, + name_id, + breaker_pos, + is_dual, + ) + except grpc.aio.AioRpcError: + _LOGGER.debug("Failed to fetch BreakerGroup for IID %d", group_iid) + + # Orphan filtering: circuits with no BreakerGroup mapping are not real + # user circuits (system/ghost metrics) — remove them + orphan_iids = [iid for iid, c in self._data.circuits.items() if c.name_iid == 0] + for iid in orphan_iids: + _LOGGER.debug("Removing orphan circuit IID %d (no BreakerGroup mapping)", iid) + del self._data.circuits[iid] + self._data.metric_iid_to_circuit.pop(iid, None) + + _LOGGER.debug( + "After BreakerGroups resolution: %d circuits remain (%d orphans removed)", + len(self._data.circuits), + len(orphan_iids), + ) + + # ------------------------------------------------------------------ + # Circuit names + # ------------------------------------------------------------------ + + async def _fetch_circuit_names(self) -> None: + """Fetch circuit names from trait 16 via GetRevision. + + Uses each circuit's name_iid (from BreakerGroups resolution) to look + up the human-readable name from trait 16. + """ + for circuit_id, info in list(self._data.circuits.items()): + name_iid = info.name_iid + if not name_iid: + _LOGGER.debug( + "Circuit %d has no name_iid (not resolved via BreakerGroups), " + "skipping name fetch", + circuit_id, + ) + continue + try: + name = await self._get_circuit_name(name_iid) + if name: + info.name = name + _LOGGER.debug( + "Circuit %d (name_iid=%d, metric_iid=%d, breaker_pos=%d): %s", + circuit_id, + name_iid, + info.metric_iid, + info.breaker_position, + name, + ) + except Exception: + _LOGGER.debug( + "Failed to get name for circuit %d (name_iid=%d)", + circuit_id, + name_iid, + ) + + async def _get_circuit_name(self, circuit_id: int) -> str | None: + """Get a single circuit name via GetRevision on trait 16.""" + assert self._channel is not None + request = self._build_get_revision_request( + vendor_id=VENDOR_SPAN, + product_id=PRODUCT_GEN3_PANEL, + trait_id=TRAIT_CIRCUIT_NAMES, + instance_id=circuit_id, + ) + + try: + response = await self._channel.unary_unary( + _GET_REVISION, + request_serializer=lambda x: x, + response_deserializer=lambda x: x, + )(request) + + return self._parse_circuit_name(response) + except grpc.aio.AioRpcError: + return None + + def _build_get_revision_request( + self, vendor_id: int, product_id: int, trait_id: int, instance_id: int + ) -> bytes: + """Build a GetRevisionRequest protobuf message manually.""" + # TraitMetadata (field 1) + meta = _encode_varint_field(1, vendor_id) + meta += _encode_varint_field(2, product_id) + meta += _encode_varint_field(3, trait_id) + meta += _encode_varint_field(4, 1) # version + + # ResourceId message + resource_id_msg = _encode_string_field(1, self._data.panel_resource_id) + + # InstanceMetadata (field 2) + iid_msg = _encode_varint_field(1, instance_id) + instance_meta = _encode_bytes_field(1, resource_id_msg) + instance_meta += _encode_bytes_field(2, iid_msg) + + # RevisionRequest (field 3) + req_metadata = _encode_bytes_field(2, resource_id_msg) + revision_request = _encode_bytes_field(1, req_metadata) + + result = _encode_bytes_field(1, meta) + result += _encode_bytes_field(2, instance_meta) + result += _encode_bytes_field(3, revision_request) + return result + + @staticmethod + def _parse_circuit_name(data: bytes) -> str | None: + """Parse circuit name from GetRevision response.""" + fields = _parse_protobuf_fields(data) + + sr_data = _get_field(fields, 3) + if not sr_data or not isinstance(sr_data, bytes): + return None + + sr_fields = _parse_protobuf_fields(sr_data) + payload_data = _get_field(sr_fields, 2) + if not payload_data or not isinstance(payload_data, bytes): + return None + + pl_fields = _parse_protobuf_fields(payload_data) + raw = _get_field(pl_fields, 1) + if not raw or not isinstance(raw, bytes): + return None + + name_fields = _parse_protobuf_fields(raw) + name = _get_field(name_fields, 4) + if name and isinstance(name, bytes): + decoded: str = name.decode("utf-8", errors="replace").strip() + return decoded + return None + + @staticmethod + def _extract_trait_ref_iid(ref_data: bytes) -> int: + """Extract an IID from a trait reference sub-message. + + Trait references use the structure: field 2 -> field 1 = iid (varint). + Returns 0 if the data cannot be parsed. + """ + if not ref_data or not isinstance(ref_data, bytes): + return 0 + ref_fields = _parse_protobuf_fields(ref_data) + iid_data = _get_field(ref_fields, 2) + if iid_data and isinstance(iid_data, bytes): + iid_fields = _parse_protobuf_fields(iid_data) + val = _get_field(iid_fields, 1, 0) + return val if isinstance(val, int) else 0 + return 0 + + @staticmethod + def _parse_breaker_group(data: bytes) -> tuple[int, int, bool]: + """Parse a BreakerGroups (trait 15) GetRevision response. + + Returns (name_id, breaker_position, is_dual_phase). All zero/False + on failure. + + Single-pole (120V) groups use field 11: + f11.f1 -> CircuitNames ref (f2.f1 = name_id) + f11.f2 -> BreakerConfig ref (f2.f1 = breaker position) + + Dual-pole (240V) groups use field 13: + f13.f1.f1 -> CircuitNames ref (f2.f1 = name_id) + f13.f4 -> BreakerConfig leg A ref (f2.f1 = breaker position A) + """ + fields = _parse_protobuf_fields(data) + sr_data = _get_field(fields, 3) + if not sr_data or not isinstance(sr_data, bytes): + return 0, 0, False + sr_fields = _parse_protobuf_fields(sr_data) + payload_data = _get_field(sr_fields, 2) + if not payload_data or not isinstance(payload_data, bytes): + return 0, 0, False + pl_fields = _parse_protobuf_fields(payload_data) + raw = _get_field(pl_fields, 1) + if not raw or not isinstance(raw, bytes): + return 0, 0, False + + group_fields = _parse_protobuf_fields(raw) + + # Single-pole (field 11) + refs_data = _get_field(group_fields, 11) + if refs_data and isinstance(refs_data, bytes): + refs = _parse_protobuf_fields(refs_data) + name_ref = _get_field(refs, 1) + config_ref = _get_field(refs, 2) + name_id = SpanGrpcClient._extract_trait_ref_iid(name_ref or b"") + brk_pos = SpanGrpcClient._extract_trait_ref_iid(config_ref or b"") + return name_id, brk_pos, False + + # Dual-pole (field 13) + dual_data = _get_field(group_fields, 13) + if dual_data and isinstance(dual_data, bytes): + dual_fields = _parse_protobuf_fields(dual_data) + name_id = 0 + name_wrapper = _get_field(dual_fields, 1) + if name_wrapper and isinstance(name_wrapper, bytes): + wf = _parse_protobuf_fields(name_wrapper) + name_ref = _get_field(wf, 1) + if name_ref and isinstance(name_ref, bytes): + name_id = SpanGrpcClient._extract_trait_ref_iid(name_ref) + leg_a_ref = _get_field(dual_fields, 4) + brk_pos = SpanGrpcClient._extract_trait_ref_iid(leg_a_ref or b"") + return name_id, brk_pos, True + + return 0, 0, False + + # ------------------------------------------------------------------ + # Metric streaming + # ------------------------------------------------------------------ + + async def _stream_loop(self) -> None: + """Run the main streaming loop with automatic reconnection.""" + while self._connected: + try: + await self._subscribe_stream() + except asyncio.CancelledError: + return + except Exception: + _LOGGER.exception("Stream error, reconnecting in 5s") + await asyncio.sleep(5) + + async def _subscribe_stream(self) -> None: + """Subscribe to the gRPC stream and process updates.""" + assert self._channel is not None + call = self._channel.unary_stream( + _SUBSCRIBE, + request_serializer=lambda x: x, + response_deserializer=lambda x: x, + ) + + stream = call(b"") + async for response in stream: + try: + self._process_notification(response) + except Exception: + _LOGGER.debug("Error processing notification", exc_info=True) + + def _process_notification(self, data: bytes) -> None: + """Process a TraitInstanceNotification from the stream.""" + fields = _parse_protobuf_fields(data) + + rti_data = _get_field(fields, 1) + if not rti_data or not isinstance(rti_data, bytes): + return + + rti_fields = _parse_protobuf_fields(rti_data) + ext_data = _get_field(rti_fields, 2) + if not ext_data or not isinstance(ext_data, bytes): + return + + ext_fields = _parse_protobuf_fields(ext_data) + info_data = _get_field(ext_fields, 2) + if not info_data or not isinstance(info_data, bytes): + return + + info_fields = _parse_protobuf_fields(info_data) + meta_data = _get_field(info_fields, 1) + if not meta_data or not isinstance(meta_data, bytes): + return + + meta_fields = _parse_protobuf_fields(meta_data) + trait_id = _get_field(meta_fields, 3, 0) + + iid_data = _get_field(info_fields, 2) + instance_id = 0 + if iid_data and isinstance(iid_data, bytes): + iid_fields = _parse_protobuf_fields(iid_data) + instance_id = _get_field(iid_fields, 1, 0) + + # Only process trait 26 (power metrics) + if trait_id != TRAIT_POWER_METRICS: + return + + notify_data = _get_field(fields, 2) + if not notify_data or not isinstance(notify_data, bytes): + return + + notify_fields = _parse_protobuf_fields(notify_data) + + metrics_list = notify_fields.get(3, []) + for metric_data in metrics_list: + if not isinstance(metric_data, bytes): + continue + + ml_fields = _parse_protobuf_fields(metric_data) + raw_metrics = ml_fields.get(3, []) + + for raw in raw_metrics: + if not isinstance(raw, bytes): + continue + self._decode_and_store_metric(instance_id, raw) + + self._notify() + + def _decode_and_store_metric(self, iid: int, raw: bytes) -> None: + """Decode a raw metric payload and store it.""" + top_fields = _parse_protobuf_fields(raw) + + # Main feed (IID 1) uses field 14 with deeper nesting + if iid == MAIN_FEED_IID: + self._data.main_feed = _decode_main_feed(raw) + return + + # Look up circuit_id from the discovered mapping + circuit_id = self._data.metric_iid_to_circuit.get(iid) + if circuit_id is None: + return + + # Dual-phase (field 12) — check first since it's more specific + dual_data = _get_field(top_fields, 12) + if dual_data and isinstance(dual_data, bytes): + self._data.metrics[circuit_id] = _decode_dual_phase(dual_data) + if circuit_id in self._data.circuits: + self._data.circuits[circuit_id].is_dual_phase = True + return + + # Single-phase (field 11) + single_data = _get_field(top_fields, 11) + if single_data and isinstance(single_data, bytes): + self._data.metrics[circuit_id] = _decode_single_phase(single_data) + if circuit_id in self._data.circuits: + self._data.circuits[circuit_id].is_dual_phase = False diff --git a/custom_components/span_panel/manifest.json b/custom_components/span_panel/manifest.json index bff2ab9..a3bf70c 100644 --- a/custom_components/span_panel/manifest.json +++ b/custom_components/span_panel/manifest.json @@ -12,9 +12,10 @@ "iot_class": "local_polling", "issue_tracker": "https://github.com/SpanPanel/span/issues", "requirements": [ - "span-panel-api~=1.1.14" + "span-panel-api~=1.1.14", + "grpcio>=1.60.0" ], - "version": "1.3.1", + "version": "1.4.0", "zeroconf": [ { "type": "_span._tcp.local." diff --git a/custom_components/span_panel/sensor.py b/custom_components/span_panel/sensor.py index b63bb3c..2a6c87a 100644 --- a/custom_components/span_panel/sensor.py +++ b/custom_components/span_panel/sensor.py @@ -6,10 +6,11 @@ from homeassistant.core import HomeAssistant from homeassistant.helpers.entity_platform import AddEntitiesCallback -from .const import COORDINATOR, DOMAIN +from .const import CONF_PANEL_GEN, COORDINATOR, DOMAIN from .coordinator import SpanPanelCoordinator from .sensors import ( SpanCircuitEnergySensor, + SpanCircuitPositionSensor, SpanCircuitPowerSensor, SpanEnergySensorBase, SpanPanelBattery, @@ -37,6 +38,7 @@ "SpanPanelEnergySensor", "SpanCircuitPowerSensor", "SpanCircuitEnergySensor", + "SpanCircuitPositionSensor", "SpanUnmappedCircuitSensor", "SpanSolarSensor", "SpanSolarEnergySensor", @@ -55,6 +57,16 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Set up sensor platform.""" + # Gen3 path — use Gen3 sensor factory + if config_entry.data.get(CONF_PANEL_GEN) == "gen3": + from .gen3.sensors import ( # pylint: disable=import-outside-toplevel + create_gen3_sensors, # noqa: E402 + ) + + gen3_coordinator = hass.data[DOMAIN][config_entry.entry_id][COORDINATOR] + async_add_entities(create_gen3_sensors(gen3_coordinator)) + return + try: data = hass.data[DOMAIN][config_entry.entry_id] coordinator: SpanPanelCoordinator = data[COORDINATOR] diff --git a/custom_components/span_panel/sensor_definitions.py b/custom_components/span_panel/sensor_definitions.py index 87d3a3f..0d944ec 100644 --- a/custom_components/span_panel/sensor_definitions.py +++ b/custom_components/span_panel/sensor_definitions.py @@ -36,7 +36,7 @@ class SpanPanelCircuitsRequiredKeysMixin: """Required keys mixin for Span Panel circuit sensors.""" - value_fn: Callable[[SpanPanelCircuit], float] + value_fn: Callable[[SpanPanelCircuit], float | None] @dataclass(frozen=True) @@ -252,8 +252,10 @@ class SpanPanelBatterySensorEntityDescription( state_class=SensorStateClass.TOTAL, suggested_display_precision=2, device_class=SensorDeviceClass.ENERGY, - value_fn=lambda panel_data: (panel_data.main_meter_energy_consumed or 0) - - (panel_data.main_meter_energy_produced or 0), + value_fn=lambda panel_data: ( + (panel_data.main_meter_energy_consumed or 0) + - (panel_data.main_meter_energy_produced or 0) + ), ), SpanPanelDataSensorEntityDescription( key="feedthroughNetEnergyWh", @@ -262,8 +264,10 @@ class SpanPanelBatterySensorEntityDescription( state_class=SensorStateClass.TOTAL, suggested_display_precision=2, device_class=SensorDeviceClass.ENERGY, - value_fn=lambda panel_data: (panel_data.feedthrough_energy_consumed or 0) - - (panel_data.feedthrough_energy_produced or 0), + value_fn=lambda panel_data: ( + (panel_data.feedthrough_energy_consumed or 0) + - (panel_data.feedthrough_energy_produced or 0) + ), ), ) @@ -273,6 +277,7 @@ class SpanPanelBatterySensorEntityDescription( SpanPanelCircuitsSensorEntityDescription, SpanPanelCircuitsSensorEntityDescription, SpanPanelCircuitsSensorEntityDescription, + SpanPanelCircuitsSensorEntityDescription, ] = ( SpanPanelCircuitsSensorEntityDescription( key="circuit_power", @@ -318,6 +323,17 @@ class SpanPanelBatterySensorEntityDescription( entity_registry_enabled_default=True, entity_registry_visible_default=True, ), + SpanPanelCircuitsSensorEntityDescription( + key="circuit_panel_position", + name="Panel Position", + native_unit_of_measurement=None, + state_class=None, + suggested_display_precision=0, + device_class=None, + value_fn=lambda circuit: min(circuit.tabs) if circuit.tabs else None, + entity_registry_enabled_default=True, + entity_registry_visible_default=True, + ), ) diff --git a/custom_components/span_panel/sensors/__init__.py b/custom_components/span_panel/sensors/__init__.py index 897c13f..1280d9b 100644 --- a/custom_components/span_panel/sensors/__init__.py +++ b/custom_components/span_panel/sensors/__init__.py @@ -1,7 +1,12 @@ """Sensors package for Span Panel integration.""" from .base import SpanEnergySensorBase, SpanSensorBase -from .circuit import SpanCircuitEnergySensor, SpanCircuitPowerSensor, SpanUnmappedCircuitSensor +from .circuit import ( + SpanCircuitEnergySensor, + SpanCircuitPositionSensor, + SpanCircuitPowerSensor, + SpanUnmappedCircuitSensor, +) from .factory import create_native_sensors, enable_unmapped_tab_entities from .panel import ( SpanPanelBattery, @@ -22,6 +27,7 @@ "SpanPanelEnergySensor", "SpanCircuitPowerSensor", "SpanCircuitEnergySensor", + "SpanCircuitPositionSensor", "SpanUnmappedCircuitSensor", "SpanSolarSensor", "SpanSolarEnergySensor", diff --git a/custom_components/span_panel/sensors/base.py b/custom_components/span_panel/sensors/base.py index c53d44e..b544261 100644 --- a/custom_components/span_panel/sensors/base.py +++ b/custom_components/span_panel/sensors/base.py @@ -8,7 +8,7 @@ from datetime import date, datetime, timedelta from decimal import Decimal import logging -from typing import Any, Generic, Self, TypeVar +from typing import Any, Self from homeassistant.components.sensor import ( RestoreSensor, @@ -34,9 +34,6 @@ # Sentinel value to distinguish "never synced" from "circuit name is None" _NAME_UNSET: object = object() -T = TypeVar("T", bound=SensorEntityDescription) -D = TypeVar("D") # For the type returned by get_data_source - def _parse_numeric_state(state: State | None) -> tuple[float | None, datetime | None]: """Extract a numeric value and naive timestamp from a restored HA state. @@ -60,7 +57,9 @@ def _parse_numeric_state(state: State | None) -> tuple[float | None, datetime | return value, last_changed -class SpanSensorBase(CoordinatorEntity[SpanPanelCoordinator], SensorEntity, Generic[T, D], ABC): +class SpanSensorBase[T: SensorEntityDescription, D]( + CoordinatorEntity[SpanPanelCoordinator], SensorEntity, ABC +): """Abstract base class for Span Panel Sensors with overrideable methods.""" _attr_has_entity_name = True @@ -395,7 +394,7 @@ def from_dict(cls, restored: dict[str, Any]) -> Self | None: return None -class SpanEnergySensorBase(SpanSensorBase[T, D], RestoreSensor, ABC): +class SpanEnergySensorBase[T: SensorEntityDescription, D](SpanSensorBase[T, D], RestoreSensor, ABC): """Base class for energy sensors that includes grace period tracking. This class extends SpanSensorBase with: diff --git a/custom_components/span_panel/sensors/circuit.py b/custom_components/span_panel/sensors/circuit.py index bb1c3dc..72ea8fb 100644 --- a/custom_components/span_panel/sensors/circuit.py +++ b/custom_components/span_panel/sensors/circuit.py @@ -286,6 +286,107 @@ def extra_state_attributes(self) -> dict[str, Any] | None: return attributes if attributes else None +class SpanCircuitPositionSensor( + SpanSensorBase[SpanPanelCircuitsSensorEntityDescription, SpanPanelCircuit] +): + """Circuit panel position (breaker slot number) sensor for Gen2 panels.""" + + def __init__( + self, + data_coordinator: SpanPanelCoordinator, + description: SpanPanelCircuitsSensorEntityDescription, + span_panel: SpanPanel, + circuit_id: str, + ) -> None: + """Initialize the circuit panel position sensor.""" + self.circuit_id = circuit_id + self.original_key = description.key + + description_with_circuit = SpanPanelCircuitsSensorEntityDescription( + key=circuit_id, + name=description.name, + native_unit_of_measurement=description.native_unit_of_measurement, + state_class=description.state_class, + suggested_display_precision=description.suggested_display_precision, + device_class=description.device_class, + value_fn=description.value_fn, + entity_registry_enabled_default=description.entity_registry_enabled_default, + entity_registry_visible_default=description.entity_registry_visible_default, + ) + + super().__init__(data_coordinator, description_with_circuit, span_panel) + + def _generate_unique_id( + self, span_panel: SpanPanel, description: SpanPanelCircuitsSensorEntityDescription + ) -> str: + """Generate unique ID for circuit position sensors.""" + return construct_circuit_unique_id_for_entry( + self.coordinator, + span_panel, + self.circuit_id, + "circuit_panel_position", + self._device_name, + ) + + def _generate_friendly_name( + self, span_panel: SpanPanel, description: SpanPanelCircuitsSensorEntityDescription + ) -> str | None: + """Generate friendly name for circuit position sensors.""" + circuit = span_panel.circuits.get(self.circuit_id) + if not circuit: + return construct_unmapped_friendly_name( + self.circuit_id, str(description.name or "Panel Position") + ) + + use_circuit_numbers = self.coordinator.config_entry.options.get(USE_CIRCUIT_NUMBERS, False) + + if use_circuit_numbers: + if circuit.tabs and len(circuit.tabs) == 2: + sorted_tabs = sorted(circuit.tabs) + circuit_identifier = f"Circuit {sorted_tabs[0]} {sorted_tabs[1]}" + elif circuit.tabs and len(circuit.tabs) == 1: + circuit_identifier = f"Circuit {circuit.tabs[0]}" + else: + circuit_identifier = f"Circuit {self.circuit_id}" + else: + if circuit.name is None: + return None + circuit_identifier = circuit.name + + return f"{circuit_identifier} {description.name or 'Panel Position'}" + + def _generate_panel_name( + self, span_panel: SpanPanel, description: SpanPanelCircuitsSensorEntityDescription + ) -> str | None: + """Generate panel name for circuit position sensors.""" + circuit = span_panel.circuits.get(self.circuit_id) + if not circuit: + return construct_unmapped_friendly_name( + self.circuit_id, str(description.name or "Panel Position") + ) + if circuit.name is None: + return None + return f"{circuit.name} {description.name or 'Panel Position'}" + + def get_data_source(self, span_panel: SpanPanel) -> SpanPanelCircuit: + """Get the data source for the circuit position sensor.""" + circuit = span_panel.circuits.get(self.circuit_id) + if circuit is None: + raise ValueError(f"Circuit {self.circuit_id} not found in panel data") + return circuit + + @property + def native_value(self) -> int | None: + """Return the breaker slot number (lowest tab position).""" + if not self.coordinator.last_update_success or not self.coordinator.data: + return None + circuit = self.coordinator.data.circuits.get(self.circuit_id) + if not circuit or not circuit.tabs: + return None + tab_min: int = min(circuit.tabs) + return tab_min + + class SpanUnmappedCircuitSensor( SpanSensorBase[SpanPanelCircuitsSensorEntityDescription, SpanPanelCircuit] ): diff --git a/custom_components/span_panel/sensors/factory.py b/custom_components/span_panel/sensors/factory.py index a939cd7..aa4b3e4 100644 --- a/custom_components/span_panel/sensors/factory.py +++ b/custom_components/span_panel/sensors/factory.py @@ -33,7 +33,12 @@ ) from custom_components.span_panel.span_panel import SpanPanel -from .circuit import SpanCircuitEnergySensor, SpanCircuitPowerSensor, SpanUnmappedCircuitSensor +from .circuit import ( + SpanCircuitEnergySensor, + SpanCircuitPositionSensor, + SpanCircuitPowerSensor, + SpanUnmappedCircuitSensor, +) from .panel import ( SpanPanelBattery, SpanPanelEnergySensor, @@ -83,9 +88,11 @@ def create_panel_sensors( def create_circuit_sensors( coordinator: SpanPanelCoordinator, span_panel: SpanPanel, config_entry: ConfigEntry -) -> list[SpanCircuitPowerSensor | SpanCircuitEnergySensor]: +) -> list[SpanCircuitPowerSensor | SpanCircuitEnergySensor | SpanCircuitPositionSensor]: """Create circuit-level sensors for named circuits.""" - entities: list[SpanCircuitPowerSensor | SpanCircuitEnergySensor] = [] + entities: list[ + SpanCircuitPowerSensor | SpanCircuitEnergySensor | SpanCircuitPositionSensor + ] = [] # Add circuit sensors for all named circuits (replacing synthetic ones) named_circuits = [cid for cid in span_panel.circuits if not cid.startswith("unmapped_tab_")] @@ -106,6 +113,13 @@ def create_circuit_sensors( entities.append( SpanCircuitPowerSensor(coordinator, circuit_description, span_panel, circuit_id) ) + elif circuit_description.key == "circuit_panel_position": + # Use position sensor for breaker slot number + entities.append( + SpanCircuitPositionSensor( + coordinator, circuit_description, span_panel, circuit_id + ) + ) else: # Use energy sensor with grace period tracking for energy measurements entities.append( @@ -230,6 +244,7 @@ def create_native_sensors( | SpanPanelEnergySensor | SpanCircuitPowerSensor | SpanCircuitEnergySensor + | SpanCircuitPositionSensor | SpanUnmappedCircuitSensor | SpanPanelBattery | SpanSolarSensor @@ -243,6 +258,7 @@ def create_native_sensors( | SpanPanelEnergySensor | SpanCircuitPowerSensor | SpanCircuitEnergySensor + | SpanCircuitPositionSensor | SpanUnmappedCircuitSensor | SpanPanelBattery | SpanSolarSensor diff --git a/poetry.lock b/poetry.lock index 9a926e5..85fd9a2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "acme" @@ -1838,7 +1838,7 @@ description = "Lightweight in-process concurrent programming" optional = false python-versions = ">=3.9" groups = ["main", "dev"] -markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\"" +markers = "python_version < \"3.14\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")" files = [ {file = "greenlet-3.2.4-cp310-cp310-macosx_11_0_universal2.whl", hash = "sha256:8c68325b0d0acf8d91dde4e6f930967dd52a5302cd4062932a6b2e7c2969f47c"}, {file = "greenlet-3.2.4-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:94385f101946790ae13da500603491f04a76b6e4c059dab271b3ce2e283b2590"}, @@ -3836,7 +3836,10 @@ files = [ ] [package.dependencies] -cffi = {version = ">=1.5.0", markers = "python_version < \"3.14\""} +cffi = [ + {version = ">=1.5.0", markers = "python_version < \"3.14\""}, + {version = ">=2.0.0b1", markers = "python_version >= \"3.14\""}, +] [package.extras] idna = ["idna (>=2.1)"] @@ -5137,7 +5140,7 @@ test = ["covdefaults (==2.3.0)", "pytest (==8.4.1)", "pytest-aiohttp (==1.1.0)", [[package]] name = "span-panel-api" -version = "1.1.14" +version = "1.1.15" description = "A client library for SPAN Panel API" optional = false python-versions = ">=3.10,<4.0" @@ -5153,6 +5156,9 @@ numpy = ">=1.21.0" python-dateutil = ">=2.8.0" pyyaml = ">=6.0.0" +[package.extras] +grpc = ["grpcio (>=1.50.0)"] + [package.source] type = "directory" url = "../span-panel-api" @@ -6314,5 +6320,5 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" -python-versions = ">=3.13.2,<3.14" -content-hash = "d2f773c82e16ff28e156c9f3cf068ed68b2d776c27a9c86f2210159f3bfecc2f" +python-versions = ">=3.13.2,<3.15" +content-hash = "d82eb51ede9ec16dc4e2cf09c0e51db9315f2469d650b9160dd4bd54b67e5e22" diff --git a/pyproject.toml b/pyproject.toml index 1b7b426..b15d6cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ readme = "README.md" package-mode = false [tool.poetry.dependencies] -python = ">=3.13.2,<3.14" +python = ">=3.13.2,<3.15" homeassistant = "2025.12.4,<2026.0.0" # Pin to exact version for custom component compatibility span-panel-api = {path = "../span-panel-api", develop = true} @@ -130,7 +130,7 @@ venv = ".venv" [tool.ruff] line-length = 100 -target-version = "py311" +target-version = "py313" [tool.ruff.lint] select = [