hass.tibber_prices/custom_components/tibber_prices/binary_sensor.py
Julian Pawlowski f4568be34e feat(sensors): add price volatility analysis and period filters
Added comprehensive volatility analysis system:
- 4 new volatility sensors (today, tomorrow, next_24h, today+tomorrow)
- Volatility classification (LOW/MODERATE/HIGH/VERY HIGH) based on price spread
- Configurable thresholds in options flow (step 6 of 6)
- Best/Peak price period filters using volatility and price level
- Price spread calculation in get_price service

Volatility sensors help users decide if price-based optimization is worthwhile.
For example, battery optimization only makes sense when volatility ≥ MODERATE.

Period filters allow AND-logic combinations:
- best_price_min_volatility: Only show cheap periods on volatile days
- best_price_max_level: Only show periods when prices reach desired level
- peak_price_min_volatility: Only show peaks on volatile days
- peak_price_min_level: Only show peaks when expensive levels occur

All 5 language files updated (de, en, nb, nl, sv) with:
- Volatility sensor translations (name, states, descriptions)
- Config flow step 6 "Volatility" with threshold settings
- Step progress indicators added to all config steps
- Period filter translations with usage tips

Impact: Users can now assess daily price volatility and configure period
sensors to only activate when conditions justify battery cycling or load
shifting. Reduces unnecessary battery wear on low-volatility days.
2025-11-09 14:24:34 +00:00

1057 lines
44 KiB
Python

"""Binary sensor platform for tibber_prices."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
BinarySensorEntityDescription,
)
from homeassistant.const import PERCENTAGE, EntityCategory
from homeassistant.core import callback
from homeassistant.util import dt as dt_util
from .coordinator import TIME_SENSITIVE_ENTITY_KEYS
from .entity import TibberPricesEntity
if TYPE_CHECKING:
from collections.abc import Callable
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .coordinator import TibberPricesDataUpdateCoordinator
from .data import TibberPricesConfigEntry
from .const import (
CONF_BEST_PRICE_MAX_LEVEL,
CONF_BEST_PRICE_MIN_VOLATILITY,
CONF_EXTENDED_DESCRIPTIONS,
CONF_MIN_VOLATILITY_FOR_PERIODS,
CONF_PEAK_PRICE_MIN_LEVEL,
CONF_PEAK_PRICE_MIN_VOLATILITY,
CONF_VOLATILITY_THRESHOLD_HIGH,
CONF_VOLATILITY_THRESHOLD_MODERATE,
CONF_VOLATILITY_THRESHOLD_VERY_HIGH,
DEFAULT_BEST_PRICE_MAX_LEVEL,
DEFAULT_BEST_PRICE_MIN_VOLATILITY,
DEFAULT_EXTENDED_DESCRIPTIONS,
DEFAULT_MIN_VOLATILITY_FOR_PERIODS,
DEFAULT_PEAK_PRICE_MIN_LEVEL,
DEFAULT_PEAK_PRICE_MIN_VOLATILITY,
DEFAULT_VOLATILITY_THRESHOLD_HIGH,
DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
PRICE_LEVEL_MAPPING,
VOLATILITY_HIGH,
VOLATILITY_LOW,
VOLATILITY_MODERATE,
VOLATILITY_VERY_HIGH,
async_get_entity_description,
get_entity_description,
)
MINUTES_PER_INTERVAL = 15
MIN_TOMORROW_INTERVALS_15MIN = 96
ENTITY_DESCRIPTIONS = (
BinarySensorEntityDescription(
key="peak_price_period",
translation_key="peak_price_period",
name="Peak Price Interval",
icon="mdi:clock-alert",
),
BinarySensorEntityDescription(
key="best_price_period",
translation_key="best_price_period",
name="Best Price Interval",
icon="mdi:clock-check",
),
BinarySensorEntityDescription(
key="connection",
translation_key="connection",
name="Tibber API Connection",
device_class=BinarySensorDeviceClass.CONNECTIVITY,
entity_category=EntityCategory.DIAGNOSTIC,
),
BinarySensorEntityDescription(
key="tomorrow_data_available",
translation_key="tomorrow_data_available",
name="Tomorrow's Data Available",
icon="mdi:calendar-check",
entity_category=EntityCategory.DIAGNOSTIC,
),
)
async def async_setup_entry(
_hass: HomeAssistant,
entry: TibberPricesConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the binary_sensor platform."""
async_add_entities(
TibberPricesBinarySensor(
coordinator=entry.runtime_data.coordinator,
entity_description=entity_description,
)
for entity_description in ENTITY_DESCRIPTIONS
)
class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"""tibber_prices binary_sensor class."""
def __init__(
self,
coordinator: TibberPricesDataUpdateCoordinator,
entity_description: BinarySensorEntityDescription,
) -> None:
"""Initialize the binary_sensor class."""
super().__init__(coordinator)
self.entity_description = entity_description
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{entity_description.key}"
self._state_getter: Callable | None = self._get_state_getter()
self._attribute_getter: Callable | None = self._get_attribute_getter()
self._time_sensitive_remove_listener: Callable | None = None
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
# Register with coordinator for time-sensitive updates if applicable
if self.entity_description.key in TIME_SENSITIVE_ENTITY_KEYS:
self._time_sensitive_remove_listener = self.coordinator.async_add_time_sensitive_listener(
self._handle_time_sensitive_update
)
async def async_will_remove_from_hass(self) -> None:
"""When entity will be removed from hass."""
await super().async_will_remove_from_hass()
# Remove time-sensitive listener if registered
if self._time_sensitive_remove_listener:
self._time_sensitive_remove_listener()
self._time_sensitive_remove_listener = None
@callback
def _handle_time_sensitive_update(self) -> None:
"""Handle time-sensitive update from coordinator."""
self.async_write_ha_state()
def _get_state_getter(self) -> Callable | None:
"""Return the appropriate state getter method based on the sensor type."""
key = self.entity_description.key
if key == "peak_price_period":
return self._peak_price_state
if key == "best_price_period":
return self._best_price_state
if key == "connection":
return lambda: True if self.coordinator.data else None
if key == "tomorrow_data_available":
return self._tomorrow_data_available_state
return None
def _best_price_state(self) -> bool | None:
"""Return True if the current time is within a best price period."""
if not self.coordinator.data:
return None
attrs = self._get_price_intervals_attributes(reverse_sort=False)
if not attrs:
return False # Should not happen, but safety fallback
start = attrs.get("start")
end = attrs.get("end")
if not start or not end:
return False # No period found = sensor is off
now = dt_util.now()
return start <= now < end
def _peak_price_state(self) -> bool | None:
"""Return True if the current time is within a peak price period."""
if not self.coordinator.data:
return None
attrs = self._get_price_intervals_attributes(reverse_sort=True)
if not attrs:
return False # Should not happen, but safety fallback
start = attrs.get("start")
end = attrs.get("end")
if not start or not end:
return False # No period found = sensor is off
now = dt_util.now()
return start <= now < end
def _tomorrow_data_available_state(self) -> bool | None:
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count == MIN_TOMORROW_INTERVALS_15MIN:
return True
if interval_count == 0:
return False
return False
def _get_tomorrow_data_available_attributes(self) -> dict | None:
"""Return attributes for tomorrow_data_available binary sensor."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count == 0:
status = "none"
elif interval_count == MIN_TOMORROW_INTERVALS_15MIN:
status = "full"
else:
status = "partial"
return {
"intervals_available": interval_count,
"data_status": status,
}
def _get_attribute_getter(self) -> Callable | None:
"""Return the appropriate attribute getter method based on the sensor type."""
key = self.entity_description.key
if key == "peak_price_period":
return lambda: self._get_price_intervals_attributes(reverse_sort=True)
if key == "best_price_period":
return lambda: self._get_price_intervals_attributes(reverse_sort=False)
if key == "tomorrow_data_available":
return self._get_tomorrow_data_available_attributes
return None
def _get_precomputed_period_data(self, *, reverse_sort: bool) -> dict | None:
"""
Get precomputed period data from coordinator.
Returns lightweight period summaries (no full price data to avoid redundancy).
"""
if not self.coordinator.data:
return None
periods_data = self.coordinator.data.get("periods", {})
period_type = "peak_price" if reverse_sort else "best_price"
return periods_data.get(period_type)
def _get_period_intervals_from_price_info(self, period_summaries: list[dict], *, reverse_sort: bool) -> list[dict]:
"""
Build full interval data from period summaries and priceInfo.
This avoids storing price data redundantly by fetching it on-demand from priceInfo.
"""
if not self.coordinator.data or not period_summaries:
return []
price_info = self.coordinator.data.get("priceInfo", {})
yesterday = price_info.get("yesterday", [])
today = price_info.get("today", [])
tomorrow = price_info.get("tomorrow", [])
# Build a quick lookup for prices by timestamp
all_prices = yesterday + today + tomorrow
price_lookup = {}
for price_data in all_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at:
starts_at = dt_util.as_local(starts_at)
price_lookup[starts_at.isoformat()] = price_data
# Get reference data for annotations
period_data = self._get_precomputed_period_data(reverse_sort=reverse_sort)
if not period_data:
return []
ref_data = period_data.get("reference_data", {})
ref_prices = ref_data.get("ref_prices", {})
avg_prices = ref_data.get("avg_prices", {})
# Build annotated intervals from period summaries
intervals = []
period_count = len(period_summaries)
for period_idx, period_summary in enumerate(period_summaries, 1):
period_start = period_summary.get("start")
period_end = period_summary.get("end")
interval_starts = period_summary.get("interval_starts", [])
interval_count = len(interval_starts)
duration_minutes = period_summary.get("duration_minutes", 0)
periods_remaining = period_count - period_idx
for interval_idx, start_iso in enumerate(interval_starts, 1):
# Get price data from priceInfo
price_data = price_lookup.get(start_iso)
if not price_data:
continue
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if not starts_at:
continue
starts_at = dt_util.as_local(starts_at)
date_key = starts_at.date().isoformat()
price_raw = float(price_data["total"])
price_minor = round(price_raw * 100, 2)
# Get reference values for this day
ref_price = ref_prices.get(date_key, 0.0)
avg_price = avg_prices.get(date_key, 0.0)
# Calculate price difference
price_diff = price_raw - ref_price
price_diff_minor = round(price_diff * 100, 2)
price_diff_pct = (price_diff / ref_price) * 100 if ref_price != 0 else 0.0
interval_remaining = interval_count - interval_idx
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
annotated = {
# Period-level attributes
"period_start": period_start,
"period_end": period_end,
"hour": period_start.hour if period_start else None,
"minute": period_start.minute if period_start else None,
"time": f"{period_start.hour:02d}:{period_start.minute:02d}" if period_start else None,
"duration_minutes": duration_minutes,
"remaining_minutes_in_period": interval_remaining * MINUTES_PER_INTERVAL,
"periods_total": period_count,
"periods_remaining": periods_remaining,
"period_position": period_idx,
# Interval-level attributes
"price": price_minor,
# Internal fields
"_interval_start": starts_at,
"_interval_end": interval_end,
"_ref_price": ref_price,
"_avg_price": avg_price,
}
# Add price difference attributes based on sensor type
if reverse_sort:
annotated["price_diff_from_max"] = price_diff_minor
annotated[f"price_diff_from_max_{PERCENTAGE}"] = round(price_diff_pct, 2)
else:
annotated["price_diff_from_min"] = price_diff_minor
annotated[f"price_diff_from_min_{PERCENTAGE}"] = round(price_diff_pct, 2)
intervals.append(annotated)
return intervals
def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None:
"""
Get price interval attributes using precomputed data from coordinator.
This method now:
1. Gets lightweight period summaries from coordinator
2. Fetches actual price data from priceInfo on-demand
3. Builds annotations without storing data redundantly
4. Filters periods based on volatility and level thresholds if configured
"""
# Check if periods should be filtered based on volatility and level
if not self._should_show_periods(reverse_sort=reverse_sort):
return self._build_empty_periods_result(reverse_sort=reverse_sort)
# Get precomputed period summaries from coordinator
period_data = self._get_precomputed_period_data(reverse_sort=reverse_sort)
if not period_data:
return self._build_no_periods_result()
period_summaries = period_data.get("periods", [])
if not period_summaries:
return self._build_no_periods_result()
# Build full interval data from summaries + priceInfo
intervals = self._get_period_intervals_from_price_info(period_summaries, reverse_sort=reverse_sort)
if not intervals:
return self._build_no_periods_result()
# Find current or next interval
current_interval = self._find_current_or_next_interval(intervals)
# Build periods summary (merge with original summaries to include level/rating_level)
periods_summary = self._build_periods_summary(intervals, period_summaries)
# Build final attributes
return self._build_final_attributes(current_interval, periods_summary, intervals)
def _should_show_periods(self, *, reverse_sort: bool) -> bool:
"""
Check if periods should be shown based on volatility AND level filters (UND-Verknüpfung).
Args:
reverse_sort: If False (best_price), checks max_level filter.
If True (peak_price), checks min_level filter.
Returns:
True if periods should be displayed, False if they should be filtered out.
Both conditions must be met for periods to be shown.
"""
if not self.coordinator.data:
return True
# Check volatility filter
if not self._check_volatility_filter(reverse_sort=reverse_sort):
return False
# Check level filter (UND-Verknüpfung)
return self._check_level_filter(reverse_sort=reverse_sort)
def _check_volatility_filter(self, *, reverse_sort: bool) -> bool:
"""
Check if today's volatility meets the minimum requirement.
Args:
reverse_sort: If False (best_price), uses CONF_BEST_PRICE_MIN_VOLATILITY.
If True (peak_price), uses CONF_PEAK_PRICE_MIN_VOLATILITY.
"""
# Get appropriate volatility config based on sensor type
if reverse_sort:
# Peak price sensor
min_volatility = self.coordinator.config_entry.options.get(
CONF_PEAK_PRICE_MIN_VOLATILITY,
# Migration: fall back to old global filter, then to new default
self.coordinator.config_entry.options.get(
CONF_MIN_VOLATILITY_FOR_PERIODS,
DEFAULT_PEAK_PRICE_MIN_VOLATILITY,
),
)
else:
# Best price sensor
min_volatility = self.coordinator.config_entry.options.get(
CONF_BEST_PRICE_MIN_VOLATILITY,
# Migration: fall back to old global filter, then to new default
self.coordinator.config_entry.options.get(
CONF_MIN_VOLATILITY_FOR_PERIODS,
DEFAULT_BEST_PRICE_MIN_VOLATILITY,
),
)
# "LOW" means no filtering (show at any volatility ≥0ct)
if min_volatility == VOLATILITY_LOW:
return True
# Legacy migration: "ANY" also means no filtering
if min_volatility == "ANY":
return True
# Get today's price data to calculate volatility
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
prices = [p.get("total") for p in today_prices if "total" in p] if today_prices else []
if not prices:
return True # If no prices, don't filter
# Calculate today's spread (volatility metric) in minor units
spread_major = (max(prices) - min(prices)) * 100
# Get volatility thresholds from config
threshold_moderate = self.coordinator.config_entry.options.get(
CONF_VOLATILITY_THRESHOLD_MODERATE,
DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
)
threshold_high = self.coordinator.config_entry.options.get(
CONF_VOLATILITY_THRESHOLD_HIGH,
DEFAULT_VOLATILITY_THRESHOLD_HIGH,
)
threshold_very_high = self.coordinator.config_entry.options.get(
CONF_VOLATILITY_THRESHOLD_VERY_HIGH,
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
)
# Map min_volatility to threshold and check if spread meets requirement
threshold_map = {
VOLATILITY_MODERATE: threshold_moderate,
VOLATILITY_HIGH: threshold_high,
VOLATILITY_VERY_HIGH: threshold_very_high,
}
required_threshold = threshold_map.get(min_volatility)
return spread_major >= required_threshold if required_threshold is not None else True
def _check_level_filter(self, *, reverse_sort: bool) -> bool:
"""
Check if today has any intervals that meet the level requirement.
Args:
reverse_sort: If False (best_price), checks max_level (upper bound filter).
If True (peak_price), checks min_level (lower bound filter).
Returns:
True if ANY interval meets the level requirement, False otherwise.
"""
# Get appropriate config based on sensor type
if reverse_sort:
# Peak price: minimum level filter (lower bound)
level_config = self.coordinator.config_entry.options.get(
CONF_PEAK_PRICE_MIN_LEVEL,
DEFAULT_PEAK_PRICE_MIN_LEVEL,
)
else:
# Best price: maximum level filter (upper bound)
level_config = self.coordinator.config_entry.options.get(
CONF_BEST_PRICE_MAX_LEVEL,
DEFAULT_BEST_PRICE_MAX_LEVEL,
)
# "ANY" means no level filtering
if level_config == "ANY":
return True
# Get today's intervals
price_info = self.coordinator.data.get("priceInfo", {})
today_intervals = price_info.get("today", [])
if not today_intervals:
return True # If no data, don't filter
# Check if ANY interval today meets the level requirement
level_order = PRICE_LEVEL_MAPPING.get(level_config, 0)
if reverse_sort:
# Peak price: level >= min_level (show if ANY interval is expensive enough)
return any(
PRICE_LEVEL_MAPPING.get(interval.get("level", "NORMAL"), 0) >= level_order
for interval in today_intervals
)
# Best price: level <= max_level (show if ANY interval is cheap enough)
return any(
PRICE_LEVEL_MAPPING.get(interval.get("level", "NORMAL"), 0) <= level_order for interval in today_intervals
)
def _build_no_periods_result(self) -> dict:
"""
Build result when no periods exist (not filtered, just none available).
Returns:
A dict with empty periods and timestamp.
"""
# Calculate timestamp: current time rounded down to last quarter hour
now = dt_util.now()
current_minute = (now.minute // 15) * 15
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
return {
"timestamp": timestamp,
"start": None,
"end": None,
"periods": [],
}
def _build_empty_periods_result(self, *, reverse_sort: bool) -> dict:
"""
Build result when periods are filtered due to volatility or level constraints.
Args:
reverse_sort: If False (best_price), reports max_level filter.
If True (peak_price), reports min_level filter.
Returns:
A dict with empty periods and a reason attribute explaining why.
"""
min_volatility = self.coordinator.config_entry.options.get(
CONF_MIN_VOLATILITY_FOR_PERIODS,
DEFAULT_MIN_VOLATILITY_FOR_PERIODS,
)
# Get appropriate level config based on sensor type
if reverse_sort:
level_config = self.coordinator.config_entry.options.get(
CONF_PEAK_PRICE_MIN_LEVEL,
DEFAULT_PEAK_PRICE_MIN_LEVEL,
)
level_filter_type = "below" # Peak price: level below min threshold
else:
level_config = self.coordinator.config_entry.options.get(
CONF_BEST_PRICE_MAX_LEVEL,
DEFAULT_BEST_PRICE_MAX_LEVEL,
)
level_filter_type = "above" # Best price: level above max threshold
# Build reason string explaining which filter(s) prevented display
reasons = []
if min_volatility != "ANY" and not self._check_volatility_filter(reverse_sort=reverse_sort):
reasons.append(f"volatility_below_{min_volatility.lower()}")
if level_config != "ANY" and not self._check_level_filter(reverse_sort=reverse_sort):
reasons.append(f"level_{level_filter_type}_{level_config.lower()}")
# Join multiple reasons with "and"
reason = "_and_".join(reasons) if reasons else "filtered"
# Calculate timestamp: current time rounded down to last quarter hour
now = dt_util.now()
current_minute = (now.minute // 15) * 15
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
return {
"timestamp": timestamp,
"start": None,
"end": None,
"periods": [],
"reason": reason,
}
def _find_current_or_next_interval(self, intervals: list[dict]) -> dict | None:
"""Find the current or next interval from the filtered list."""
now = dt_util.now()
# First pass: find currently active interval
for interval in intervals:
start = interval.get("_interval_start")
end = interval.get("_interval_end")
if start and end and start <= now < end:
return interval.copy()
# Second pass: find next future interval
for interval in intervals:
start = interval.get("_interval_start")
if start and start > now:
return interval.copy()
return None
def _build_periods_summary(self, intervals: list[dict], original_summaries: list[dict]) -> list[dict]:
"""
Build a summary of periods with consistent attribute structure.
Returns a list of period summaries with the same attributes as top-level,
making the structure predictable and easy to use in automations.
Args:
intervals: List of interval dictionaries with period information
original_summaries: Original period summaries from coordinator (with level/rating_level)
"""
if not intervals:
return []
# Build a lookup for original summaries by start time
original_lookup: dict[str, dict] = {}
for summary in original_summaries:
start = summary.get("start")
if start:
key = start.isoformat() if hasattr(start, "isoformat") else str(start)
original_lookup[key] = summary
# Group intervals by period (they have the same period_start)
periods_dict: dict[str, list[dict]] = {}
for interval in intervals:
period_key = interval.get("period_start")
if period_key:
key_str = period_key.isoformat() if hasattr(period_key, "isoformat") else str(period_key)
if key_str not in periods_dict:
periods_dict[key_str] = []
periods_dict[key_str].append(interval)
# Build summary for each period with consistent attribute names
summaries = []
for period_intervals in periods_dict.values():
if not period_intervals:
continue
first = period_intervals[0]
prices = [i["price"] for i in period_intervals if "price" in i]
# Get level and rating_level from original summaries first
aggregated_level = None
aggregated_rating_level = None
period_start = first.get("period_start")
if period_start:
key = period_start.isoformat() if hasattr(period_start, "isoformat") else str(period_start)
original = original_lookup.get(key)
if original:
aggregated_level = original.get("level")
aggregated_rating_level = original.get("rating_level")
# Follow attribute ordering from copilot-instructions.md
summary = {
"start": first.get("period_start"),
"end": first.get("period_end"),
"duration_minutes": first.get("duration_minutes"),
"level": aggregated_level,
"rating_level": aggregated_rating_level,
"price_avg": round(sum(prices) / len(prices), 2) if prices else 0,
"price_min": round(min(prices), 2) if prices else 0,
"price_max": round(max(prices), 2) if prices else 0,
"price_spread": round(max(prices) - min(prices), 2) if prices else 0,
"hour": first.get("hour"),
"minute": first.get("minute"),
"time": first.get("time"),
"periods_total": first.get("periods_total"),
"periods_remaining": first.get("periods_remaining"),
"period_position": first.get("period_position"),
"interval_count": len(period_intervals),
}
# Add price_diff attributes if present (price differences step 4)
self._add_price_diff_for_period(summary, period_intervals, first)
summaries.append(summary)
return summaries
def _build_final_attributes(
self,
current_interval: dict | None,
periods_summary: list[dict],
filtered_result: list[dict],
) -> dict:
"""
Build the final attributes dictionary from period summary and current interval.
Combines period-level attributes with current interval-specific attributes,
ensuring price_diff reflects the current interval's position vs daily min/max.
"""
now = dt_util.now()
current_minute = (now.minute // 15) * 15
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
if current_interval and periods_summary:
# Find the current period in the summary based on period_start
current_period_start = current_interval.get("period_start")
current_period_summary = None
for period in periods_summary:
if period.get("start") == current_period_start:
current_period_summary = period
break
if current_period_summary:
# Follow attribute ordering from copilot-instructions.md
attributes = {
"timestamp": timestamp,
"start": current_period_summary.get("start"),
"end": current_period_summary.get("end"),
"duration_minutes": current_period_summary.get("duration_minutes"),
"level": current_period_summary.get("level"),
"rating_level": current_period_summary.get("rating_level"),
"price_avg": current_period_summary.get("price_avg"),
"price_min": current_period_summary.get("price_min"),
"price_max": current_period_summary.get("price_max"),
"price_spread": current_period_summary.get("price_spread"),
"hour": current_period_summary.get("hour"),
"minute": current_period_summary.get("minute"),
"time": current_period_summary.get("time"),
"periods_total": current_period_summary.get("periods_total"),
"periods_remaining": current_period_summary.get("periods_remaining"),
"period_position": current_period_summary.get("period_position"),
"interval_count": current_period_summary.get("interval_count"),
}
# Add period price_diff attributes if present
if "period_price_diff_from_daily_min" in current_period_summary:
attributes["period_price_diff_from_daily_min"] = current_period_summary[
"period_price_diff_from_daily_min"
]
if "period_price_diff_from_daily_min_%" in current_period_summary:
attributes["period_price_diff_from_daily_min_%"] = current_period_summary[
"period_price_diff_from_daily_min_%"
]
elif "period_price_diff_from_daily_max" in current_period_summary:
attributes["period_price_diff_from_daily_max"] = current_period_summary[
"period_price_diff_from_daily_max"
]
if "period_price_diff_from_daily_max_%" in current_period_summary:
attributes["period_price_diff_from_daily_max_%"] = current_period_summary[
"period_price_diff_from_daily_max_%"
]
# Add interval-specific price_diff attributes (separate from period average)
# Shows the reference interval's position vs daily min/max:
# - If period is active: current 15-min interval vs daily min/max
# - If period hasn't started: first interval of the period vs daily min/max
# This value is what determines if an interval is part of a period (compared to flex setting)
if "price_diff_from_min" in current_interval:
attributes["interval_price_diff_from_daily_min"] = current_interval["price_diff_from_min"]
attributes["interval_price_diff_from_daily_min_%"] = current_interval.get("price_diff_from_min_%")
elif "price_diff_from_max" in current_interval:
attributes["interval_price_diff_from_daily_max"] = current_interval["price_diff_from_max"]
attributes["interval_price_diff_from_daily_max_%"] = current_interval.get("price_diff_from_max_%")
# Nested structures last (meta information step 6)
attributes["periods"] = periods_summary
return attributes
# Fallback if current period not found in summary
return {
"timestamp": timestamp,
"periods": periods_summary,
"interval_count": len(filtered_result),
}
# No periods found
return {
"timestamp": timestamp,
"periods": [],
"interval_count": 0,
}
def _add_price_diff_for_period(self, summary: dict, period_intervals: list[dict], first: dict) -> None:
"""
Add price difference attributes for the period based on sensor type.
Uses the reference price (min/max) from the start day of the period to ensure
consistent comparison, especially for periods spanning midnight.
Calculates how the period's average price compares to the daily min/max,
helping to explain why the period qualifies based on flex settings.
"""
# Determine sensor type and get the reference price from the first interval
# (which represents the start of the period and its day's reference value)
if "price_diff_from_min" in first:
# Best price sensor: calculate difference from the period's start day minimum
period_start = first.get("period_start")
if not period_start:
return
# Get all prices in minor units (cents/øre) from the period
prices = [i["price"] for i in period_intervals if "price" in i]
if not prices:
return
period_avg_price = sum(prices) / len(prices)
# Extract the reference min price from first interval's calculation
# We can back-calculate it from the first interval's price and diff
first_price_minor = first.get("price")
first_diff_minor = first.get("price_diff_from_min")
if first_price_minor is not None and first_diff_minor is not None:
ref_min_price = first_price_minor - first_diff_minor
period_diff = period_avg_price - ref_min_price
# Period average price difference from daily minimum
summary["period_price_diff_from_daily_min"] = round(period_diff, 2)
if ref_min_price != 0:
period_diff_pct = (period_diff / ref_min_price) * 100
summary["period_price_diff_from_daily_min_%"] = round(period_diff_pct, 2)
elif "price_diff_from_max" in first:
# Peak price sensor: calculate difference from the period's start day maximum
period_start = first.get("period_start")
if not period_start:
return
# Get all prices in minor units (cents/øre) from the period
prices = [i["price"] for i in period_intervals if "price" in i]
if not prices:
return
period_avg_price = sum(prices) / len(prices)
# Extract the reference max price from first interval's calculation
first_price_minor = first.get("price")
first_diff_minor = first.get("price_diff_from_max")
if first_price_minor is not None and first_diff_minor is not None:
ref_max_price = first_price_minor - first_diff_minor
period_diff = period_avg_price - ref_max_price
# Period average price difference from daily maximum
summary["period_price_diff_from_daily_max"] = round(period_diff, 2)
if ref_max_price != 0:
period_diff_pct = (period_diff / ref_max_price) * 100
summary["period_price_diff_from_daily_max_%"] = round(period_diff_pct, 2)
def _get_price_hours_attributes(self, *, attribute_name: str, reverse_sort: bool) -> dict | None:
"""Get price hours attributes."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
if not today_prices:
return None
prices = [
(
datetime.fromisoformat(price["startsAt"]).hour,
float(price["total"]),
)
for price in today_prices
]
# Sort by price (high to low for peak, low to high for best)
sorted_hours = sorted(prices, key=lambda x: x[1], reverse=reverse_sort)[:5]
return {attribute_name: [{"hour": hour, "price": price} for hour, price in sorted_hours]}
@property
def is_on(self) -> bool | None:
"""Return true if the binary_sensor is on."""
try:
if not self.coordinator.data or not self._state_getter:
return None
return self._state_getter()
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor state",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
@property
async def async_extra_state_attributes(self) -> dict | None:
"""Return additional state attributes asynchronously."""
try:
# Get the dynamic attributes if the getter is available
if not self.coordinator.data:
return None
attributes = {}
if self._attribute_getter:
dynamic_attrs = self._attribute_getter()
if dynamic_attrs:
# Copy and remove internal fields before exposing to user
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
attributes.update(clean_attrs)
# Add descriptions from the custom translations file
if self.entity_description.translation_key and self.hass is not None:
# Get user's language preference
language = self.hass.config.language if self.hass.config.language else "en"
# Add basic description
description = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"description",
)
if description:
attributes["description"] = description
# Check if extended descriptions are enabled in the config
extended_descriptions = self.coordinator.config_entry.options.get(
CONF_EXTENDED_DESCRIPTIONS,
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
)
# Add extended descriptions if enabled
if extended_descriptions:
# Add long description if available
long_desc = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"long_description",
)
if long_desc:
attributes["long_description"] = long_desc
# Add usage tips if available
usage_tips = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"usage_tips",
)
if usage_tips:
attributes["usage_tips"] = usage_tips
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor attributes",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
else:
return attributes if attributes else None
@property
def extra_state_attributes(self) -> dict | None:
"""Return additional state attributes synchronously."""
try:
# Start with dynamic attributes if available
if not self.coordinator.data:
return None
attributes = {}
if self._attribute_getter:
dynamic_attrs = self._attribute_getter()
if dynamic_attrs:
# Copy and remove internal fields before exposing to user
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
attributes.update(clean_attrs)
# Add descriptions from the cache (non-blocking)
if self.entity_description.translation_key and self.hass is not None:
# Get user's language preference
language = self.hass.config.language if self.hass.config.language else "en"
# Add basic description from cache
description = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"description",
)
if description:
attributes["description"] = description
# Check if extended descriptions are enabled in the config
extended_descriptions = self.coordinator.config_entry.options.get(
CONF_EXTENDED_DESCRIPTIONS,
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
)
# Add extended descriptions if enabled (from cache only)
if extended_descriptions:
# Add long description if available in cache
long_desc = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"long_description",
)
if long_desc:
attributes["long_description"] = long_desc
# Add usage tips if available in cache
usage_tips = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"usage_tips",
)
if usage_tips:
attributes["usage_tips"] = usage_tips
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor attributes",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
else:
return attributes if attributes else None
async def async_update(self) -> None:
"""Force a refresh when homeassistant.update_entity is called."""
await self.coordinator.async_request_refresh()