mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-30 05:13:40 +00:00
Changed all selector option keys from uppercase to lowercase to comply with Home Assistant's hassfest validation pattern [a-z0-9-_]+. Fixed inconsistency in PEAK_PRICE_MIN_LEVEL_OPTIONS where some values were uppercase while others were lowercase. Changes: - translations/*.json: All selector keys now lowercase (volatility, price_level) - const.py: Added .lower() to all PEAK_PRICE_MIN_LEVEL_OPTIONS values - binary_sensor.py: Added .upper() conversion when looking up price levels in PRICE_LEVEL_MAPPING to handle lowercase config values Impact: Config flow now works correctly with translated selector options. Hassfest validation passes without selector key errors.
1054 lines
43 KiB
Python
1054 lines
43 KiB
Python
"""Binary sensor platform for tibber_prices."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import TYPE_CHECKING
|
|
|
|
from homeassistant.components.binary_sensor import (
|
|
BinarySensorDeviceClass,
|
|
BinarySensorEntity,
|
|
BinarySensorEntityDescription,
|
|
)
|
|
from homeassistant.const import PERCENTAGE, EntityCategory
|
|
from homeassistant.core import callback
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .coordinator import TIME_SENSITIVE_ENTITY_KEYS
|
|
from .entity import TibberPricesEntity
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Callable
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
|
|
from .coordinator import TibberPricesDataUpdateCoordinator
|
|
from .data import TibberPricesConfigEntry
|
|
|
|
from .const import (
|
|
CONF_BEST_PRICE_MAX_LEVEL,
|
|
CONF_BEST_PRICE_MIN_VOLATILITY,
|
|
CONF_EXTENDED_DESCRIPTIONS,
|
|
CONF_PEAK_PRICE_MIN_LEVEL,
|
|
CONF_PEAK_PRICE_MIN_VOLATILITY,
|
|
CONF_VOLATILITY_THRESHOLD_HIGH,
|
|
CONF_VOLATILITY_THRESHOLD_MODERATE,
|
|
CONF_VOLATILITY_THRESHOLD_VERY_HIGH,
|
|
DEFAULT_BEST_PRICE_MAX_LEVEL,
|
|
DEFAULT_BEST_PRICE_MIN_VOLATILITY,
|
|
DEFAULT_EXTENDED_DESCRIPTIONS,
|
|
DEFAULT_PEAK_PRICE_MIN_LEVEL,
|
|
DEFAULT_PEAK_PRICE_MIN_VOLATILITY,
|
|
DEFAULT_VOLATILITY_THRESHOLD_HIGH,
|
|
DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
|
|
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
|
|
PRICE_LEVEL_MAPPING,
|
|
VOLATILITY_HIGH,
|
|
VOLATILITY_MODERATE,
|
|
VOLATILITY_VERY_HIGH,
|
|
async_get_entity_description,
|
|
get_entity_description,
|
|
)
|
|
|
|
MINUTES_PER_INTERVAL = 15
|
|
MIN_TOMORROW_INTERVALS_15MIN = 96
|
|
|
|
ENTITY_DESCRIPTIONS = (
|
|
BinarySensorEntityDescription(
|
|
key="peak_price_period",
|
|
translation_key="peak_price_period",
|
|
name="Peak Price Interval",
|
|
icon="mdi:clock-alert",
|
|
),
|
|
BinarySensorEntityDescription(
|
|
key="best_price_period",
|
|
translation_key="best_price_period",
|
|
name="Best Price Interval",
|
|
icon="mdi:clock-check",
|
|
),
|
|
BinarySensorEntityDescription(
|
|
key="connection",
|
|
translation_key="connection",
|
|
name="Tibber API Connection",
|
|
device_class=BinarySensorDeviceClass.CONNECTIVITY,
|
|
entity_category=EntityCategory.DIAGNOSTIC,
|
|
),
|
|
BinarySensorEntityDescription(
|
|
key="tomorrow_data_available",
|
|
translation_key="tomorrow_data_available",
|
|
name="Tomorrow's Data Available",
|
|
icon="mdi:calendar-check",
|
|
entity_category=EntityCategory.DIAGNOSTIC,
|
|
),
|
|
)
|
|
|
|
|
|
async def async_setup_entry(
|
|
_hass: HomeAssistant,
|
|
entry: TibberPricesConfigEntry,
|
|
async_add_entities: AddEntitiesCallback,
|
|
) -> None:
|
|
"""Set up the binary_sensor platform."""
|
|
async_add_entities(
|
|
TibberPricesBinarySensor(
|
|
coordinator=entry.runtime_data.coordinator,
|
|
entity_description=entity_description,
|
|
)
|
|
for entity_description in ENTITY_DESCRIPTIONS
|
|
)
|
|
|
|
|
|
class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
|
|
"""tibber_prices binary_sensor class."""
|
|
|
|
def __init__(
|
|
self,
|
|
coordinator: TibberPricesDataUpdateCoordinator,
|
|
entity_description: BinarySensorEntityDescription,
|
|
) -> None:
|
|
"""Initialize the binary_sensor class."""
|
|
super().__init__(coordinator)
|
|
self.entity_description = entity_description
|
|
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{entity_description.key}"
|
|
self._state_getter: Callable | None = self._get_state_getter()
|
|
self._attribute_getter: Callable | None = self._get_attribute_getter()
|
|
self._time_sensitive_remove_listener: Callable | None = None
|
|
|
|
async def async_added_to_hass(self) -> None:
|
|
"""When entity is added to hass."""
|
|
await super().async_added_to_hass()
|
|
|
|
# Register with coordinator for time-sensitive updates if applicable
|
|
if self.entity_description.key in TIME_SENSITIVE_ENTITY_KEYS:
|
|
self._time_sensitive_remove_listener = self.coordinator.async_add_time_sensitive_listener(
|
|
self._handle_time_sensitive_update
|
|
)
|
|
|
|
async def async_will_remove_from_hass(self) -> None:
|
|
"""When entity will be removed from hass."""
|
|
await super().async_will_remove_from_hass()
|
|
|
|
# Remove time-sensitive listener if registered
|
|
if self._time_sensitive_remove_listener:
|
|
self._time_sensitive_remove_listener()
|
|
self._time_sensitive_remove_listener = None
|
|
|
|
@callback
|
|
def _handle_time_sensitive_update(self) -> None:
|
|
"""Handle time-sensitive update from coordinator."""
|
|
self.async_write_ha_state()
|
|
|
|
def _get_state_getter(self) -> Callable | None:
|
|
"""Return the appropriate state getter method based on the sensor type."""
|
|
key = self.entity_description.key
|
|
|
|
if key == "peak_price_period":
|
|
return self._peak_price_state
|
|
if key == "best_price_period":
|
|
return self._best_price_state
|
|
if key == "connection":
|
|
return lambda: True if self.coordinator.data else None
|
|
if key == "tomorrow_data_available":
|
|
return self._tomorrow_data_available_state
|
|
|
|
return None
|
|
|
|
def _best_price_state(self) -> bool | None:
|
|
"""Return True if the current time is within a best price period."""
|
|
if not self.coordinator.data:
|
|
return None
|
|
attrs = self._get_price_intervals_attributes(reverse_sort=False)
|
|
if not attrs:
|
|
return False # Should not happen, but safety fallback
|
|
start = attrs.get("start")
|
|
end = attrs.get("end")
|
|
if not start or not end:
|
|
return False # No period found = sensor is off
|
|
now = dt_util.now()
|
|
return start <= now < end
|
|
|
|
def _peak_price_state(self) -> bool | None:
|
|
"""Return True if the current time is within a peak price period."""
|
|
if not self.coordinator.data:
|
|
return None
|
|
attrs = self._get_price_intervals_attributes(reverse_sort=True)
|
|
if not attrs:
|
|
return False # Should not happen, but safety fallback
|
|
start = attrs.get("start")
|
|
end = attrs.get("end")
|
|
if not start or not end:
|
|
return False # No period found = sensor is off
|
|
now = dt_util.now()
|
|
return start <= now < end
|
|
|
|
def _tomorrow_data_available_state(self) -> bool | None:
|
|
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
|
|
if not self.coordinator.data:
|
|
return None
|
|
price_info = self.coordinator.data.get("priceInfo", {})
|
|
tomorrow_prices = price_info.get("tomorrow", [])
|
|
interval_count = len(tomorrow_prices)
|
|
if interval_count == MIN_TOMORROW_INTERVALS_15MIN:
|
|
return True
|
|
if interval_count == 0:
|
|
return False
|
|
return False
|
|
|
|
def _get_tomorrow_data_available_attributes(self) -> dict | None:
|
|
"""Return attributes for tomorrow_data_available binary sensor."""
|
|
if not self.coordinator.data:
|
|
return None
|
|
price_info = self.coordinator.data.get("priceInfo", {})
|
|
tomorrow_prices = price_info.get("tomorrow", [])
|
|
interval_count = len(tomorrow_prices)
|
|
if interval_count == 0:
|
|
status = "none"
|
|
elif interval_count == MIN_TOMORROW_INTERVALS_15MIN:
|
|
status = "full"
|
|
else:
|
|
status = "partial"
|
|
return {
|
|
"intervals_available": interval_count,
|
|
"data_status": status,
|
|
}
|
|
|
|
def _get_attribute_getter(self) -> Callable | None:
|
|
"""Return the appropriate attribute getter method based on the sensor type."""
|
|
key = self.entity_description.key
|
|
|
|
if key == "peak_price_period":
|
|
return lambda: self._get_price_intervals_attributes(reverse_sort=True)
|
|
if key == "best_price_period":
|
|
return lambda: self._get_price_intervals_attributes(reverse_sort=False)
|
|
if key == "tomorrow_data_available":
|
|
return self._get_tomorrow_data_available_attributes
|
|
|
|
return None
|
|
|
|
def _get_precomputed_period_data(self, *, reverse_sort: bool) -> dict | None:
|
|
"""
|
|
Get precomputed period data from coordinator.
|
|
|
|
Returns lightweight period summaries (no full price data to avoid redundancy).
|
|
"""
|
|
if not self.coordinator.data:
|
|
return None
|
|
|
|
periods_data = self.coordinator.data.get("periods", {})
|
|
period_type = "peak_price" if reverse_sort else "best_price"
|
|
return periods_data.get(period_type)
|
|
|
|
def _get_period_intervals_from_price_info(self, period_summaries: list[dict], *, reverse_sort: bool) -> list[dict]:
|
|
"""
|
|
Build full interval data from period summaries and priceInfo.
|
|
|
|
This avoids storing price data redundantly by fetching it on-demand from priceInfo.
|
|
"""
|
|
if not self.coordinator.data or not period_summaries:
|
|
return []
|
|
|
|
price_info = self.coordinator.data.get("priceInfo", {})
|
|
yesterday = price_info.get("yesterday", [])
|
|
today = price_info.get("today", [])
|
|
tomorrow = price_info.get("tomorrow", [])
|
|
|
|
# Build a quick lookup for prices by timestamp
|
|
all_prices = yesterday + today + tomorrow
|
|
price_lookup = {}
|
|
for price_data in all_prices:
|
|
starts_at = dt_util.parse_datetime(price_data["startsAt"])
|
|
if starts_at:
|
|
starts_at = dt_util.as_local(starts_at)
|
|
price_lookup[starts_at.isoformat()] = price_data
|
|
|
|
# Get reference data for annotations
|
|
period_data = self._get_precomputed_period_data(reverse_sort=reverse_sort)
|
|
if not period_data:
|
|
return []
|
|
|
|
ref_data = period_data.get("reference_data", {})
|
|
ref_prices = ref_data.get("ref_prices", {})
|
|
avg_prices = ref_data.get("avg_prices", {})
|
|
|
|
# Build annotated intervals from period summaries
|
|
intervals = []
|
|
period_count = len(period_summaries)
|
|
|
|
for period_idx, period_summary in enumerate(period_summaries, 1):
|
|
period_start = period_summary.get("start")
|
|
period_end = period_summary.get("end")
|
|
interval_starts = period_summary.get("interval_starts", [])
|
|
interval_count = len(interval_starts)
|
|
duration_minutes = period_summary.get("duration_minutes", 0)
|
|
periods_remaining = period_count - period_idx
|
|
|
|
for interval_idx, start_iso in enumerate(interval_starts, 1):
|
|
# Get price data from priceInfo
|
|
price_data = price_lookup.get(start_iso)
|
|
if not price_data:
|
|
continue
|
|
|
|
starts_at = dt_util.parse_datetime(price_data["startsAt"])
|
|
if not starts_at:
|
|
continue
|
|
starts_at = dt_util.as_local(starts_at)
|
|
date_key = starts_at.date().isoformat()
|
|
|
|
price_raw = float(price_data["total"])
|
|
price_minor = round(price_raw * 100, 2)
|
|
|
|
# Get reference values for this day
|
|
ref_price = ref_prices.get(date_key, 0.0)
|
|
avg_price = avg_prices.get(date_key, 0.0)
|
|
|
|
# Calculate price difference
|
|
price_diff = price_raw - ref_price
|
|
price_diff_minor = round(price_diff * 100, 2)
|
|
price_diff_pct = (price_diff / ref_price) * 100 if ref_price != 0 else 0.0
|
|
|
|
interval_remaining = interval_count - interval_idx
|
|
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
|
|
|
|
annotated = {
|
|
# Period-level attributes
|
|
"period_start": period_start,
|
|
"period_end": period_end,
|
|
"hour": period_start.hour if period_start else None,
|
|
"minute": period_start.minute if period_start else None,
|
|
"time": f"{period_start.hour:02d}:{period_start.minute:02d}" if period_start else None,
|
|
"duration_minutes": duration_minutes,
|
|
"remaining_minutes_in_period": interval_remaining * MINUTES_PER_INTERVAL,
|
|
"periods_total": period_count,
|
|
"periods_remaining": periods_remaining,
|
|
"period_position": period_idx,
|
|
# Interval-level attributes
|
|
"price": price_minor,
|
|
# Internal fields
|
|
"_interval_start": starts_at,
|
|
"_interval_end": interval_end,
|
|
"_ref_price": ref_price,
|
|
"_avg_price": avg_price,
|
|
}
|
|
|
|
# Add price difference attributes based on sensor type
|
|
if reverse_sort:
|
|
annotated["price_diff_from_max"] = price_diff_minor
|
|
annotated[f"price_diff_from_max_{PERCENTAGE}"] = round(price_diff_pct, 2)
|
|
else:
|
|
annotated["price_diff_from_min"] = price_diff_minor
|
|
annotated[f"price_diff_from_min_{PERCENTAGE}"] = round(price_diff_pct, 2)
|
|
|
|
intervals.append(annotated)
|
|
|
|
return intervals
|
|
|
|
def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None:
|
|
"""
|
|
Get price interval attributes using precomputed data from coordinator.
|
|
|
|
This method now:
|
|
1. Gets lightweight period summaries from coordinator
|
|
2. Fetches actual price data from priceInfo on-demand
|
|
3. Builds annotations without storing data redundantly
|
|
4. Filters periods based on volatility and level thresholds if configured
|
|
"""
|
|
# Check if periods should be filtered based on volatility and level
|
|
if not self._should_show_periods(reverse_sort=reverse_sort):
|
|
return self._build_empty_periods_result(reverse_sort=reverse_sort)
|
|
|
|
# Get precomputed period summaries from coordinator
|
|
period_data = self._get_precomputed_period_data(reverse_sort=reverse_sort)
|
|
if not period_data:
|
|
return self._build_no_periods_result()
|
|
|
|
period_summaries = period_data.get("periods", [])
|
|
if not period_summaries:
|
|
return self._build_no_periods_result()
|
|
|
|
# Build full interval data from summaries + priceInfo
|
|
intervals = self._get_period_intervals_from_price_info(period_summaries, reverse_sort=reverse_sort)
|
|
if not intervals:
|
|
return self._build_no_periods_result()
|
|
|
|
# Find current or next interval
|
|
current_interval = self._find_current_or_next_interval(intervals)
|
|
|
|
# Build periods summary (merge with original summaries to include level/rating_level)
|
|
periods_summary = self._build_periods_summary(intervals, period_summaries)
|
|
|
|
# Build final attributes
|
|
return self._build_final_attributes(current_interval, periods_summary, intervals)
|
|
|
|
def _should_show_periods(self, *, reverse_sort: bool) -> bool:
|
|
"""
|
|
Check if periods should be shown based on volatility AND level filters (UND-Verknüpfung).
|
|
|
|
Args:
|
|
reverse_sort: If False (best_price), checks max_level filter.
|
|
If True (peak_price), checks min_level filter.
|
|
|
|
Returns:
|
|
True if periods should be displayed, False if they should be filtered out.
|
|
Both conditions must be met for periods to be shown.
|
|
|
|
"""
|
|
if not self.coordinator.data:
|
|
return True
|
|
|
|
# Check volatility filter
|
|
if not self._check_volatility_filter(reverse_sort=reverse_sort):
|
|
return False
|
|
|
|
# Check level filter (UND-Verknüpfung)
|
|
return self._check_level_filter(reverse_sort=reverse_sort)
|
|
|
|
def _check_volatility_filter(self, *, reverse_sort: bool) -> bool:
|
|
"""
|
|
Check if today's volatility meets the minimum requirement.
|
|
|
|
Args:
|
|
reverse_sort: If False (best_price), uses CONF_BEST_PRICE_MIN_VOLATILITY.
|
|
If True (peak_price), uses CONF_PEAK_PRICE_MIN_VOLATILITY.
|
|
|
|
"""
|
|
# Get appropriate volatility config based on sensor type
|
|
if reverse_sort:
|
|
# Peak price sensor
|
|
min_volatility = self.coordinator.config_entry.options.get(
|
|
CONF_PEAK_PRICE_MIN_VOLATILITY,
|
|
DEFAULT_PEAK_PRICE_MIN_VOLATILITY,
|
|
)
|
|
else:
|
|
# Best price sensor
|
|
min_volatility = self.coordinator.config_entry.options.get(
|
|
CONF_BEST_PRICE_MIN_VOLATILITY,
|
|
DEFAULT_BEST_PRICE_MIN_VOLATILITY,
|
|
)
|
|
|
|
# "low" means no filtering (show at any volatility ≥0ct)
|
|
if min_volatility == "low":
|
|
return True
|
|
|
|
# "any" is legacy alias for "low" (no filtering)
|
|
if min_volatility == "any":
|
|
return True
|
|
|
|
# Get today's price data to calculate volatility
|
|
price_info = self.coordinator.data.get("priceInfo", {})
|
|
today_prices = price_info.get("today", [])
|
|
prices = [p.get("total") for p in today_prices if "total" in p] if today_prices else []
|
|
|
|
if not prices:
|
|
return True # If no prices, don't filter
|
|
|
|
# Calculate today's spread (volatility metric) in minor units
|
|
spread_major = (max(prices) - min(prices)) * 100
|
|
|
|
# Get volatility thresholds from config
|
|
threshold_moderate = self.coordinator.config_entry.options.get(
|
|
CONF_VOLATILITY_THRESHOLD_MODERATE,
|
|
DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
|
|
)
|
|
threshold_high = self.coordinator.config_entry.options.get(
|
|
CONF_VOLATILITY_THRESHOLD_HIGH,
|
|
DEFAULT_VOLATILITY_THRESHOLD_HIGH,
|
|
)
|
|
threshold_very_high = self.coordinator.config_entry.options.get(
|
|
CONF_VOLATILITY_THRESHOLD_VERY_HIGH,
|
|
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
|
|
)
|
|
|
|
# Map min_volatility to threshold and check if spread meets requirement
|
|
threshold_map = {
|
|
VOLATILITY_MODERATE: threshold_moderate,
|
|
VOLATILITY_HIGH: threshold_high,
|
|
VOLATILITY_VERY_HIGH: threshold_very_high,
|
|
}
|
|
|
|
required_threshold = threshold_map.get(min_volatility)
|
|
return spread_major >= required_threshold if required_threshold is not None else True
|
|
|
|
def _check_level_filter(self, *, reverse_sort: bool) -> bool:
|
|
"""
|
|
Check if today has any intervals that meet the level requirement.
|
|
|
|
Args:
|
|
reverse_sort: If False (best_price), checks max_level (upper bound filter).
|
|
If True (peak_price), checks min_level (lower bound filter).
|
|
|
|
Returns:
|
|
True if ANY interval meets the level requirement, False otherwise.
|
|
|
|
"""
|
|
# Get appropriate config based on sensor type
|
|
if reverse_sort:
|
|
# Peak price: minimum level filter (lower bound)
|
|
level_config = self.coordinator.config_entry.options.get(
|
|
CONF_PEAK_PRICE_MIN_LEVEL,
|
|
DEFAULT_PEAK_PRICE_MIN_LEVEL,
|
|
)
|
|
else:
|
|
# Best price: maximum level filter (upper bound)
|
|
level_config = self.coordinator.config_entry.options.get(
|
|
CONF_BEST_PRICE_MAX_LEVEL,
|
|
DEFAULT_BEST_PRICE_MAX_LEVEL,
|
|
)
|
|
|
|
# "any" means no level filtering
|
|
if level_config == "any":
|
|
return True
|
|
|
|
# Get today's intervals
|
|
price_info = self.coordinator.data.get("priceInfo", {})
|
|
today_intervals = price_info.get("today", [])
|
|
|
|
if not today_intervals:
|
|
return True # If no data, don't filter
|
|
|
|
# Check if ANY interval today meets the level requirement
|
|
# Note: level_config is lowercase from selector, but PRICE_LEVEL_MAPPING uses uppercase
|
|
level_order = PRICE_LEVEL_MAPPING.get(level_config.upper(), 0)
|
|
|
|
if reverse_sort:
|
|
# Peak price: level >= min_level (show if ANY interval is expensive enough)
|
|
return any(
|
|
PRICE_LEVEL_MAPPING.get(interval.get("level", "NORMAL"), 0) >= level_order
|
|
for interval in today_intervals
|
|
)
|
|
# Best price: level <= max_level (show if ANY interval is cheap enough)
|
|
return any(
|
|
PRICE_LEVEL_MAPPING.get(interval.get("level", "NORMAL"), 0) <= level_order for interval in today_intervals
|
|
)
|
|
|
|
def _build_no_periods_result(self) -> dict:
|
|
"""
|
|
Build result when no periods exist (not filtered, just none available).
|
|
|
|
Returns:
|
|
A dict with empty periods and timestamp.
|
|
|
|
"""
|
|
# Calculate timestamp: current time rounded down to last quarter hour
|
|
now = dt_util.now()
|
|
current_minute = (now.minute // 15) * 15
|
|
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
|
|
|
|
return {
|
|
"timestamp": timestamp,
|
|
"start": None,
|
|
"end": None,
|
|
"periods": [],
|
|
}
|
|
|
|
def _build_empty_periods_result(self, *, reverse_sort: bool) -> dict:
|
|
"""
|
|
Build result when periods are filtered due to volatility or level constraints.
|
|
|
|
Args:
|
|
reverse_sort: If False (best_price), reports max_level filter.
|
|
If True (peak_price), reports min_level filter.
|
|
|
|
Returns:
|
|
A dict with empty periods and a reason attribute explaining why.
|
|
|
|
"""
|
|
# Get appropriate volatility config based on sensor type
|
|
if reverse_sort:
|
|
min_volatility = self.coordinator.config_entry.options.get(
|
|
CONF_PEAK_PRICE_MIN_VOLATILITY,
|
|
DEFAULT_PEAK_PRICE_MIN_VOLATILITY,
|
|
)
|
|
else:
|
|
min_volatility = self.coordinator.config_entry.options.get(
|
|
CONF_BEST_PRICE_MIN_VOLATILITY,
|
|
DEFAULT_BEST_PRICE_MIN_VOLATILITY,
|
|
)
|
|
|
|
# Get appropriate level config based on sensor type
|
|
if reverse_sort:
|
|
level_config = self.coordinator.config_entry.options.get(
|
|
CONF_PEAK_PRICE_MIN_LEVEL,
|
|
DEFAULT_PEAK_PRICE_MIN_LEVEL,
|
|
)
|
|
level_filter_type = "below" # Peak price: level below min threshold
|
|
else:
|
|
level_config = self.coordinator.config_entry.options.get(
|
|
CONF_BEST_PRICE_MAX_LEVEL,
|
|
DEFAULT_BEST_PRICE_MAX_LEVEL,
|
|
)
|
|
level_filter_type = "above" # Best price: level above max threshold
|
|
|
|
# Build reason string explaining which filter(s) prevented display
|
|
reasons = []
|
|
if min_volatility != "any" and not self._check_volatility_filter(reverse_sort=reverse_sort):
|
|
reasons.append(f"volatility_below_{min_volatility}")
|
|
if level_config != "any" and not self._check_level_filter(reverse_sort=reverse_sort):
|
|
reasons.append(f"level_{level_filter_type}_{level_config}")
|
|
|
|
# Join multiple reasons with "and"
|
|
reason = "_and_".join(reasons) if reasons else "filtered"
|
|
|
|
# Calculate timestamp: current time rounded down to last quarter hour
|
|
now = dt_util.now()
|
|
current_minute = (now.minute // 15) * 15
|
|
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
|
|
|
|
return {
|
|
"timestamp": timestamp,
|
|
"start": None,
|
|
"end": None,
|
|
"periods": [],
|
|
"reason": reason,
|
|
}
|
|
|
|
def _find_current_or_next_interval(self, intervals: list[dict]) -> dict | None:
|
|
"""Find the current or next interval from the filtered list."""
|
|
now = dt_util.now()
|
|
# First pass: find currently active interval
|
|
for interval in intervals:
|
|
start = interval.get("_interval_start")
|
|
end = interval.get("_interval_end")
|
|
if start and end and start <= now < end:
|
|
return interval.copy()
|
|
# Second pass: find next future interval
|
|
for interval in intervals:
|
|
start = interval.get("_interval_start")
|
|
if start and start > now:
|
|
return interval.copy()
|
|
return None
|
|
|
|
def _build_periods_summary(self, intervals: list[dict], original_summaries: list[dict]) -> list[dict]:
|
|
"""
|
|
Build a summary of periods with consistent attribute structure.
|
|
|
|
Returns a list of period summaries with the same attributes as top-level,
|
|
making the structure predictable and easy to use in automations.
|
|
|
|
Args:
|
|
intervals: List of interval dictionaries with period information
|
|
original_summaries: Original period summaries from coordinator (with level/rating_level)
|
|
|
|
"""
|
|
if not intervals:
|
|
return []
|
|
|
|
# Build a lookup for original summaries by start time
|
|
original_lookup: dict[str, dict] = {}
|
|
for summary in original_summaries:
|
|
start = summary.get("start")
|
|
if start:
|
|
key = start.isoformat() if hasattr(start, "isoformat") else str(start)
|
|
original_lookup[key] = summary
|
|
|
|
# Group intervals by period (they have the same period_start)
|
|
periods_dict: dict[str, list[dict]] = {}
|
|
for interval in intervals:
|
|
period_key = interval.get("period_start")
|
|
if period_key:
|
|
key_str = period_key.isoformat() if hasattr(period_key, "isoformat") else str(period_key)
|
|
if key_str not in periods_dict:
|
|
periods_dict[key_str] = []
|
|
periods_dict[key_str].append(interval)
|
|
|
|
# Build summary for each period with consistent attribute names
|
|
summaries = []
|
|
for period_intervals in periods_dict.values():
|
|
if not period_intervals:
|
|
continue
|
|
|
|
first = period_intervals[0]
|
|
prices = [i["price"] for i in period_intervals if "price" in i]
|
|
|
|
# Get level and rating_level from original summaries first
|
|
aggregated_level = None
|
|
aggregated_rating_level = None
|
|
period_start = first.get("period_start")
|
|
if period_start:
|
|
key = period_start.isoformat() if hasattr(period_start, "isoformat") else str(period_start)
|
|
original = original_lookup.get(key)
|
|
if original:
|
|
aggregated_level = original.get("level")
|
|
aggregated_rating_level = original.get("rating_level")
|
|
|
|
# Follow attribute ordering from copilot-instructions.md
|
|
summary = {
|
|
"start": first.get("period_start"),
|
|
"end": first.get("period_end"),
|
|
"duration_minutes": first.get("duration_minutes"),
|
|
"level": aggregated_level,
|
|
"rating_level": aggregated_rating_level,
|
|
"price_avg": round(sum(prices) / len(prices), 2) if prices else 0,
|
|
"price_min": round(min(prices), 2) if prices else 0,
|
|
"price_max": round(max(prices), 2) if prices else 0,
|
|
"price_spread": round(max(prices) - min(prices), 2) if prices else 0,
|
|
"hour": first.get("hour"),
|
|
"minute": first.get("minute"),
|
|
"time": first.get("time"),
|
|
"periods_total": first.get("periods_total"),
|
|
"periods_remaining": first.get("periods_remaining"),
|
|
"period_position": first.get("period_position"),
|
|
"interval_count": len(period_intervals),
|
|
}
|
|
|
|
# Add price_diff attributes if present (price differences step 4)
|
|
self._add_price_diff_for_period(summary, period_intervals, first)
|
|
|
|
summaries.append(summary)
|
|
|
|
return summaries
|
|
|
|
def _build_final_attributes(
|
|
self,
|
|
current_interval: dict | None,
|
|
periods_summary: list[dict],
|
|
filtered_result: list[dict],
|
|
) -> dict:
|
|
"""
|
|
Build the final attributes dictionary from period summary and current interval.
|
|
|
|
Combines period-level attributes with current interval-specific attributes,
|
|
ensuring price_diff reflects the current interval's position vs daily min/max.
|
|
"""
|
|
now = dt_util.now()
|
|
current_minute = (now.minute // 15) * 15
|
|
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
|
|
|
|
if current_interval and periods_summary:
|
|
# Find the current period in the summary based on period_start
|
|
current_period_start = current_interval.get("period_start")
|
|
current_period_summary = None
|
|
|
|
for period in periods_summary:
|
|
if period.get("start") == current_period_start:
|
|
current_period_summary = period
|
|
break
|
|
|
|
if current_period_summary:
|
|
# Follow attribute ordering from copilot-instructions.md
|
|
attributes = {
|
|
"timestamp": timestamp,
|
|
"start": current_period_summary.get("start"),
|
|
"end": current_period_summary.get("end"),
|
|
"duration_minutes": current_period_summary.get("duration_minutes"),
|
|
"level": current_period_summary.get("level"),
|
|
"rating_level": current_period_summary.get("rating_level"),
|
|
"price_avg": current_period_summary.get("price_avg"),
|
|
"price_min": current_period_summary.get("price_min"),
|
|
"price_max": current_period_summary.get("price_max"),
|
|
"price_spread": current_period_summary.get("price_spread"),
|
|
"hour": current_period_summary.get("hour"),
|
|
"minute": current_period_summary.get("minute"),
|
|
"time": current_period_summary.get("time"),
|
|
"periods_total": current_period_summary.get("periods_total"),
|
|
"periods_remaining": current_period_summary.get("periods_remaining"),
|
|
"period_position": current_period_summary.get("period_position"),
|
|
"interval_count": current_period_summary.get("interval_count"),
|
|
}
|
|
|
|
# Add period price_diff attributes if present
|
|
if "period_price_diff_from_daily_min" in current_period_summary:
|
|
attributes["period_price_diff_from_daily_min"] = current_period_summary[
|
|
"period_price_diff_from_daily_min"
|
|
]
|
|
if "period_price_diff_from_daily_min_%" in current_period_summary:
|
|
attributes["period_price_diff_from_daily_min_%"] = current_period_summary[
|
|
"period_price_diff_from_daily_min_%"
|
|
]
|
|
elif "period_price_diff_from_daily_max" in current_period_summary:
|
|
attributes["period_price_diff_from_daily_max"] = current_period_summary[
|
|
"period_price_diff_from_daily_max"
|
|
]
|
|
if "period_price_diff_from_daily_max_%" in current_period_summary:
|
|
attributes["period_price_diff_from_daily_max_%"] = current_period_summary[
|
|
"period_price_diff_from_daily_max_%"
|
|
]
|
|
|
|
# Add interval-specific price_diff attributes (separate from period average)
|
|
# Shows the reference interval's position vs daily min/max:
|
|
# - If period is active: current 15-min interval vs daily min/max
|
|
# - If period hasn't started: first interval of the period vs daily min/max
|
|
# This value is what determines if an interval is part of a period (compared to flex setting)
|
|
if "price_diff_from_min" in current_interval:
|
|
attributes["interval_price_diff_from_daily_min"] = current_interval["price_diff_from_min"]
|
|
attributes["interval_price_diff_from_daily_min_%"] = current_interval.get("price_diff_from_min_%")
|
|
elif "price_diff_from_max" in current_interval:
|
|
attributes["interval_price_diff_from_daily_max"] = current_interval["price_diff_from_max"]
|
|
attributes["interval_price_diff_from_daily_max_%"] = current_interval.get("price_diff_from_max_%")
|
|
|
|
# Nested structures last (meta information step 6)
|
|
attributes["periods"] = periods_summary
|
|
return attributes
|
|
|
|
# Fallback if current period not found in summary
|
|
return {
|
|
"timestamp": timestamp,
|
|
"periods": periods_summary,
|
|
"interval_count": len(filtered_result),
|
|
}
|
|
|
|
# No periods found
|
|
return {
|
|
"timestamp": timestamp,
|
|
"periods": [],
|
|
"interval_count": 0,
|
|
}
|
|
|
|
def _add_price_diff_for_period(self, summary: dict, period_intervals: list[dict], first: dict) -> None:
|
|
"""
|
|
Add price difference attributes for the period based on sensor type.
|
|
|
|
Uses the reference price (min/max) from the start day of the period to ensure
|
|
consistent comparison, especially for periods spanning midnight.
|
|
|
|
Calculates how the period's average price compares to the daily min/max,
|
|
helping to explain why the period qualifies based on flex settings.
|
|
"""
|
|
# Determine sensor type and get the reference price from the first interval
|
|
# (which represents the start of the period and its day's reference value)
|
|
if "price_diff_from_min" in first:
|
|
# Best price sensor: calculate difference from the period's start day minimum
|
|
period_start = first.get("period_start")
|
|
if not period_start:
|
|
return
|
|
|
|
# Get all prices in minor units (cents/øre) from the period
|
|
prices = [i["price"] for i in period_intervals if "price" in i]
|
|
if not prices:
|
|
return
|
|
|
|
period_avg_price = sum(prices) / len(prices)
|
|
|
|
# Extract the reference min price from first interval's calculation
|
|
# We can back-calculate it from the first interval's price and diff
|
|
first_price_minor = first.get("price")
|
|
first_diff_minor = first.get("price_diff_from_min")
|
|
|
|
if first_price_minor is not None and first_diff_minor is not None:
|
|
ref_min_price = first_price_minor - first_diff_minor
|
|
period_diff = period_avg_price - ref_min_price
|
|
|
|
# Period average price difference from daily minimum
|
|
summary["period_price_diff_from_daily_min"] = round(period_diff, 2)
|
|
if ref_min_price != 0:
|
|
period_diff_pct = (period_diff / ref_min_price) * 100
|
|
summary["period_price_diff_from_daily_min_%"] = round(period_diff_pct, 2)
|
|
|
|
elif "price_diff_from_max" in first:
|
|
# Peak price sensor: calculate difference from the period's start day maximum
|
|
period_start = first.get("period_start")
|
|
if not period_start:
|
|
return
|
|
|
|
# Get all prices in minor units (cents/øre) from the period
|
|
prices = [i["price"] for i in period_intervals if "price" in i]
|
|
if not prices:
|
|
return
|
|
|
|
period_avg_price = sum(prices) / len(prices)
|
|
|
|
# Extract the reference max price from first interval's calculation
|
|
first_price_minor = first.get("price")
|
|
first_diff_minor = first.get("price_diff_from_max")
|
|
|
|
if first_price_minor is not None and first_diff_minor is not None:
|
|
ref_max_price = first_price_minor - first_diff_minor
|
|
period_diff = period_avg_price - ref_max_price
|
|
|
|
# Period average price difference from daily maximum
|
|
summary["period_price_diff_from_daily_max"] = round(period_diff, 2)
|
|
if ref_max_price != 0:
|
|
period_diff_pct = (period_diff / ref_max_price) * 100
|
|
summary["period_price_diff_from_daily_max_%"] = round(period_diff_pct, 2)
|
|
|
|
def _get_price_hours_attributes(self, *, attribute_name: str, reverse_sort: bool) -> dict | None:
|
|
"""Get price hours attributes."""
|
|
if not self.coordinator.data:
|
|
return None
|
|
|
|
price_info = self.coordinator.data.get("priceInfo", {})
|
|
|
|
today_prices = price_info.get("today", [])
|
|
if not today_prices:
|
|
return None
|
|
|
|
prices = [
|
|
(
|
|
datetime.fromisoformat(price["startsAt"]).hour,
|
|
float(price["total"]),
|
|
)
|
|
for price in today_prices
|
|
]
|
|
|
|
# Sort by price (high to low for peak, low to high for best)
|
|
sorted_hours = sorted(prices, key=lambda x: x[1], reverse=reverse_sort)[:5]
|
|
|
|
return {attribute_name: [{"hour": hour, "price": price} for hour, price in sorted_hours]}
|
|
|
|
@property
|
|
def is_on(self) -> bool | None:
|
|
"""Return true if the binary_sensor is on."""
|
|
try:
|
|
if not self.coordinator.data or not self._state_getter:
|
|
return None
|
|
|
|
return self._state_getter()
|
|
|
|
except (KeyError, ValueError, TypeError) as ex:
|
|
self.coordinator.logger.exception(
|
|
"Error getting binary sensor state",
|
|
extra={
|
|
"error": str(ex),
|
|
"entity": self.entity_description.key,
|
|
},
|
|
)
|
|
return None
|
|
|
|
@property
|
|
async def async_extra_state_attributes(self) -> dict | None:
|
|
"""Return additional state attributes asynchronously."""
|
|
try:
|
|
# Get the dynamic attributes if the getter is available
|
|
if not self.coordinator.data:
|
|
return None
|
|
|
|
attributes = {}
|
|
if self._attribute_getter:
|
|
dynamic_attrs = self._attribute_getter()
|
|
if dynamic_attrs:
|
|
# Copy and remove internal fields before exposing to user
|
|
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
|
|
attributes.update(clean_attrs)
|
|
|
|
# Add descriptions from the custom translations file
|
|
if self.entity_description.translation_key and self.hass is not None:
|
|
# Get user's language preference
|
|
language = self.hass.config.language if self.hass.config.language else "en"
|
|
|
|
# Add basic description
|
|
description = await async_get_entity_description(
|
|
self.hass,
|
|
"binary_sensor",
|
|
self.entity_description.translation_key,
|
|
language,
|
|
"description",
|
|
)
|
|
if description:
|
|
attributes["description"] = description
|
|
|
|
# Check if extended descriptions are enabled in the config
|
|
extended_descriptions = self.coordinator.config_entry.options.get(
|
|
CONF_EXTENDED_DESCRIPTIONS,
|
|
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
|
|
)
|
|
|
|
# Add extended descriptions if enabled
|
|
if extended_descriptions:
|
|
# Add long description if available
|
|
long_desc = await async_get_entity_description(
|
|
self.hass,
|
|
"binary_sensor",
|
|
self.entity_description.translation_key,
|
|
language,
|
|
"long_description",
|
|
)
|
|
if long_desc:
|
|
attributes["long_description"] = long_desc
|
|
|
|
# Add usage tips if available
|
|
usage_tips = await async_get_entity_description(
|
|
self.hass,
|
|
"binary_sensor",
|
|
self.entity_description.translation_key,
|
|
language,
|
|
"usage_tips",
|
|
)
|
|
if usage_tips:
|
|
attributes["usage_tips"] = usage_tips
|
|
|
|
except (KeyError, ValueError, TypeError) as ex:
|
|
self.coordinator.logger.exception(
|
|
"Error getting binary sensor attributes",
|
|
extra={
|
|
"error": str(ex),
|
|
"entity": self.entity_description.key,
|
|
},
|
|
)
|
|
return None
|
|
else:
|
|
return attributes if attributes else None
|
|
|
|
@property
|
|
def extra_state_attributes(self) -> dict | None:
|
|
"""Return additional state attributes synchronously."""
|
|
try:
|
|
# Start with dynamic attributes if available
|
|
if not self.coordinator.data:
|
|
return None
|
|
|
|
attributes = {}
|
|
if self._attribute_getter:
|
|
dynamic_attrs = self._attribute_getter()
|
|
if dynamic_attrs:
|
|
# Copy and remove internal fields before exposing to user
|
|
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
|
|
attributes.update(clean_attrs)
|
|
|
|
# Add descriptions from the cache (non-blocking)
|
|
if self.entity_description.translation_key and self.hass is not None:
|
|
# Get user's language preference
|
|
language = self.hass.config.language if self.hass.config.language else "en"
|
|
|
|
# Add basic description from cache
|
|
description = get_entity_description(
|
|
"binary_sensor",
|
|
self.entity_description.translation_key,
|
|
language,
|
|
"description",
|
|
)
|
|
if description:
|
|
attributes["description"] = description
|
|
|
|
# Check if extended descriptions are enabled in the config
|
|
extended_descriptions = self.coordinator.config_entry.options.get(
|
|
CONF_EXTENDED_DESCRIPTIONS,
|
|
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
|
|
)
|
|
|
|
# Add extended descriptions if enabled (from cache only)
|
|
if extended_descriptions:
|
|
# Add long description if available in cache
|
|
long_desc = get_entity_description(
|
|
"binary_sensor",
|
|
self.entity_description.translation_key,
|
|
language,
|
|
"long_description",
|
|
)
|
|
if long_desc:
|
|
attributes["long_description"] = long_desc
|
|
|
|
# Add usage tips if available in cache
|
|
usage_tips = get_entity_description(
|
|
"binary_sensor",
|
|
self.entity_description.translation_key,
|
|
language,
|
|
"usage_tips",
|
|
)
|
|
if usage_tips:
|
|
attributes["usage_tips"] = usage_tips
|
|
|
|
except (KeyError, ValueError, TypeError) as ex:
|
|
self.coordinator.logger.exception(
|
|
"Error getting binary sensor attributes",
|
|
extra={
|
|
"error": str(ex),
|
|
"entity": self.entity_description.key,
|
|
},
|
|
)
|
|
return None
|
|
else:
|
|
return attributes if attributes else None
|
|
|
|
async def async_update(self) -> None:
|
|
"""Force a refresh when homeassistant.update_entity is called."""
|
|
await self.coordinator.async_request_refresh()
|