hass.tibber_prices/custom_components/tibber_prices/binary_sensor.py

1158 lines
48 KiB
Python

"""Binary sensor platform for tibber_prices."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
BinarySensorEntityDescription,
)
from homeassistant.const import PERCENTAGE, EntityCategory
from homeassistant.core import callback
from homeassistant.util import dt as dt_util
from .coordinator import TIME_SENSITIVE_ENTITY_KEYS
from .entity import TibberPricesEntity
from .sensor import find_price_data_for_interval
if TYPE_CHECKING:
from collections.abc import Callable
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .coordinator import TibberPricesDataUpdateCoordinator
from .data import TibberPricesConfigEntry
from .const import (
CONF_BEST_PRICE_FLEX,
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
CONF_BEST_PRICE_MIN_PERIOD_LENGTH,
CONF_EXTENDED_DESCRIPTIONS,
CONF_PEAK_PRICE_FLEX,
CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
CONF_PEAK_PRICE_MIN_PERIOD_LENGTH,
DEFAULT_BEST_PRICE_FLEX,
DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH,
DEFAULT_EXTENDED_DESCRIPTIONS,
DEFAULT_PEAK_PRICE_FLEX,
DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH,
async_get_entity_description,
get_entity_description,
)
MINUTES_PER_INTERVAL = 15
MIN_TOMORROW_INTERVALS_15MIN = 96
ENTITY_DESCRIPTIONS = (
BinarySensorEntityDescription(
key="peak_price_period",
translation_key="peak_price_period",
name="Peak Price Interval",
icon="mdi:clock-alert",
),
BinarySensorEntityDescription(
key="best_price_period",
translation_key="best_price_period",
name="Best Price Interval",
icon="mdi:clock-check",
),
BinarySensorEntityDescription(
key="connection",
translation_key="connection",
name="Tibber API Connection",
device_class=BinarySensorDeviceClass.CONNECTIVITY,
entity_category=EntityCategory.DIAGNOSTIC,
),
BinarySensorEntityDescription(
key="tomorrow_data_available",
translation_key="tomorrow_data_available",
name="Tomorrow's Data Available",
icon="mdi:calendar-check",
entity_category=EntityCategory.DIAGNOSTIC,
),
)
async def async_setup_entry(
_hass: HomeAssistant,
entry: TibberPricesConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the binary_sensor platform."""
async_add_entities(
TibberPricesBinarySensor(
coordinator=entry.runtime_data.coordinator,
entity_description=entity_description,
)
for entity_description in ENTITY_DESCRIPTIONS
)
class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"""tibber_prices binary_sensor class."""
def __init__(
self,
coordinator: TibberPricesDataUpdateCoordinator,
entity_description: BinarySensorEntityDescription,
) -> None:
"""Initialize the binary_sensor class."""
super().__init__(coordinator)
self.entity_description = entity_description
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{entity_description.key}"
self._state_getter: Callable | None = self._get_state_getter()
self._attribute_getter: Callable | None = self._get_attribute_getter()
self._time_sensitive_remove_listener: Callable | None = None
# Cache for expensive period calculations to avoid recalculating twice
# (once for is_on, once for attributes)
self._period_cache: dict[str, Any] = {}
self._cache_key: str = ""
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
# Register with coordinator for time-sensitive updates if applicable
if self.entity_description.key in TIME_SENSITIVE_ENTITY_KEYS:
self._time_sensitive_remove_listener = self.coordinator.async_add_time_sensitive_listener(
self._handle_time_sensitive_update
)
async def async_will_remove_from_hass(self) -> None:
"""When entity will be removed from hass."""
await super().async_will_remove_from_hass()
# Remove time-sensitive listener if registered
if self._time_sensitive_remove_listener:
self._time_sensitive_remove_listener()
self._time_sensitive_remove_listener = None
@callback
def _handle_time_sensitive_update(self) -> None:
"""Handle time-sensitive update from coordinator."""
# Invalidate cache when data potentially changes
self._cache_key = ""
self._period_cache = {}
self.async_write_ha_state()
def _get_state_getter(self) -> Callable | None:
"""Return the appropriate state getter method based on the sensor type."""
key = self.entity_description.key
if key == "peak_price_period":
return self._peak_price_state
if key == "best_price_period":
return self._best_price_state
if key == "connection":
return lambda: True if self.coordinator.data else None
if key == "tomorrow_data_available":
return self._tomorrow_data_available_state
return None
def _generate_cache_key(self, *, reverse_sort: bool) -> str:
"""
Generate a cache key based on coordinator data and config options.
This ensures we recalculate when data or configuration changes,
but reuse cached results for multiple property accesses.
"""
if not self.coordinator.data:
return ""
# Include timestamp to invalidate when data changes
timestamp = self.coordinator.data.get("timestamp", "")
# Include relevant config options that affect period calculation
options = self.coordinator.config_entry.options
data = self.coordinator.config_entry.data
if reverse_sort:
flex = options.get(CONF_PEAK_PRICE_FLEX, data.get(CONF_PEAK_PRICE_FLEX, DEFAULT_PEAK_PRICE_FLEX))
min_dist = options.get(
CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
data.get(CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG, DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG),
)
min_len = options.get(
CONF_PEAK_PRICE_MIN_PERIOD_LENGTH,
data.get(CONF_PEAK_PRICE_MIN_PERIOD_LENGTH, DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH),
)
else:
flex = options.get(CONF_BEST_PRICE_FLEX, data.get(CONF_BEST_PRICE_FLEX, DEFAULT_BEST_PRICE_FLEX))
min_dist = options.get(
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
data.get(CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG, DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG),
)
min_len = options.get(
CONF_BEST_PRICE_MIN_PERIOD_LENGTH,
data.get(CONF_BEST_PRICE_MIN_PERIOD_LENGTH, DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH),
)
return f"{timestamp}_{reverse_sort}_{flex}_{min_dist}_{min_len}"
def _get_flex_option(self, option_key: str, default: float) -> float:
"""
Get a float option from config entry.
Converts percentage values to decimal fractions.
- CONF_BEST_PRICE_FLEX: positive 0-100 → 0.0-1.0
- CONF_PEAK_PRICE_FLEX: negative -100 to 0 → -1.0 to 0.0
Args:
option_key: The config key (CONF_BEST_PRICE_FLEX or CONF_PEAK_PRICE_FLEX)
default: Default value to use if not found
Returns:
Value converted to decimal fraction (e.g., 5 → 0.05, -5 → -0.05)
"""
options = self.coordinator.config_entry.options
data = self.coordinator.config_entry.data
value = options.get(option_key, data.get(option_key, default))
try:
value = float(value) / 100
except (TypeError, ValueError):
value = default
return value
def _best_price_state(self) -> bool | None:
"""Return True if the current time is within a best price period."""
if not self.coordinator.data:
return None
attrs = self._get_price_intervals_attributes(reverse_sort=False)
if not attrs or "start" not in attrs or "end" not in attrs:
return None
now = dt_util.now()
start = attrs.get("start")
end = attrs.get("end")
return start <= now < end if start and end else None
def _peak_price_state(self) -> bool | None:
"""Return True if the current time is within a peak price period."""
if not self.coordinator.data:
return None
attrs = self._get_price_intervals_attributes(reverse_sort=True)
if not attrs or "start" not in attrs or "end" not in attrs:
return None
now = dt_util.now()
start = attrs.get("start")
end = attrs.get("end")
return start <= now < end if start and end else None
def _tomorrow_data_available_state(self) -> bool | None:
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count == MIN_TOMORROW_INTERVALS_15MIN:
return True
if interval_count == 0:
return False
return False
def _get_tomorrow_data_available_attributes(self) -> dict | None:
"""Return attributes for tomorrow_data_available binary sensor."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count == 0:
status = "none"
elif interval_count == MIN_TOMORROW_INTERVALS_15MIN:
status = "full"
else:
status = "partial"
return {
"intervals_available": interval_count,
"status": status,
}
def _get_attribute_getter(self) -> Callable | None:
"""Return the appropriate attribute getter method based on the sensor type."""
key = self.entity_description.key
if key == "peak_price_period":
return lambda: self._get_price_intervals_attributes(reverse_sort=True)
if key == "best_price_period":
return lambda: self._get_price_intervals_attributes(reverse_sort=False)
if key == "tomorrow_data_available":
return self._get_tomorrow_data_available_attributes
return None
def _get_current_price_data(self) -> tuple[list[float], float] | None:
"""Get current price data if available."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
if not today_prices:
return None
now = dt_util.now()
current_interval_data = find_price_data_for_interval({"today": today_prices}, now)
if not current_interval_data:
return None
prices = [float(price["total"]) for price in today_prices]
prices.sort()
return prices, float(current_interval_data["total"])
def _annotate_single_interval(
self,
interval: dict,
annotation_ctx: dict,
) -> dict:
"""Annotate a single interval with all required attributes for Home Assistant UI and automations."""
interval_copy = interval.copy()
interval_remaining = annotation_ctx["interval_count"] - annotation_ctx["interval_idx"]
# Extract and keep internal interval fields for logic (with _ prefix)
interval_start = interval_copy.pop("interval_start", None)
interval_end = interval_copy.pop("interval_end", None)
# Remove other interval_* fields that are no longer needed
interval_copy.pop("interval_hour", None)
interval_copy.pop("interval_minute", None)
interval_copy.pop("interval_time", None)
# Remove startsAt - not needed anymore
interval_copy.pop("startsAt", None)
price_raw = interval_copy.pop("price", None)
new_interval = {
"period_start": annotation_ctx["period_start"],
"period_end": annotation_ctx["period_end"],
"hour": annotation_ctx["period_start_hour"],
"minute": annotation_ctx["period_start_minute"],
"time": annotation_ctx["period_start_time"],
"duration_minutes": annotation_ctx["period_length"],
"remaining_minutes_after_interval": interval_remaining * MINUTES_PER_INTERVAL,
"periods_total": annotation_ctx["period_count"],
"periods_remaining": annotation_ctx["periods_remaining"],
"period_position": annotation_ctx["period_idx"],
"price": round(price_raw * 100, 2) if price_raw is not None else None,
# Keep internal fields for logic (state checks, filtering)
"_interval_start": interval_start,
"_interval_end": interval_end,
}
new_interval.update(interval_copy)
# Add price_diff_from_min for best_price_period
if annotation_ctx.get("diff_key") == "price_diff_from_min":
price_diff = price_raw - annotation_ctx["ref_price"]
new_interval["price_diff_from_min"] = round(price_diff * 100, 2)
# Calculate percent difference from min price
price_diff_percent = (
((price_raw - annotation_ctx["ref_price"]) / annotation_ctx["ref_price"]) * 100
if annotation_ctx["ref_price"] != 0
else 0.0
)
new_interval["price_diff_from_min_" + PERCENTAGE] = round(price_diff_percent, 2)
# Add price_diff_from_max for peak_price_period
elif annotation_ctx.get("diff_key") == "price_diff_from_max":
price_diff = price_raw - annotation_ctx["ref_price"]
new_interval["price_diff_from_max"] = round(price_diff * 100, 2)
# Calculate percent difference from max price
price_diff_percent = (
((price_raw - annotation_ctx["ref_price"]) / annotation_ctx["ref_price"]) * 100
if annotation_ctx["ref_price"] != 0
else 0.0
)
new_interval["price_diff_from_max_" + PERCENTAGE] = round(price_diff_percent, 2)
return new_interval
def _annotate_period_intervals(
self,
periods: list[list[dict]],
ref_prices: dict,
avg_price_by_day: dict,
) -> list[dict]:
"""
Return flattened and annotated intervals with period info and requested properties.
Uses the correct reference price for each interval's date.
"""
reference_type = None
if self.entity_description.key == "best_price_period":
reference_type = "min"
elif self.entity_description.key == "peak_price_period":
reference_type = "max"
else:
reference_type = "ref"
if reference_type == "min":
diff_key = "price_diff_from_min"
diff_pct_key = "price_diff_from_min_" + PERCENTAGE
elif reference_type == "max":
diff_key = "price_diff_from_max"
diff_pct_key = "price_diff_from_max_" + PERCENTAGE
else:
diff_key = "price_diff"
diff_pct_key = "price_diff_" + PERCENTAGE
result = []
period_count = len(periods)
for period_idx, period in enumerate(periods, 1):
period_start = period[0]["interval_start"] if period else None
period_start_hour = period_start.hour if period_start else None
period_start_minute = period_start.minute if period_start else None
period_start_time = f"{period_start_hour:02d}:{period_start_minute:02d}" if period_start else None
period_end = period[-1]["interval_end"] if period else None
interval_count = len(period)
period_length = interval_count * MINUTES_PER_INTERVAL
periods_remaining = len(periods) - period_idx
for interval_idx, interval in enumerate(period, 1):
interval_start = interval.get("interval_start")
interval_date = interval_start.date() if interval_start else None
avg_price = avg_price_by_day.get(interval_date, 0)
ref_price = ref_prices.get(interval_date, 0)
annotation_ctx = {
"period_start": period_start,
"period_end": period_end,
"period_start_hour": period_start_hour,
"period_start_minute": period_start_minute,
"period_start_time": period_start_time,
"period_length": period_length,
"interval_count": interval_count,
"interval_idx": interval_idx,
"period_count": period_count,
"periods_remaining": periods_remaining,
"period_idx": period_idx,
"ref_price": ref_price,
"avg_price": avg_price,
"diff_key": diff_key,
"diff_pct_key": diff_pct_key,
}
new_interval = self._annotate_single_interval(
interval,
annotation_ctx,
)
result.append(new_interval)
return result
def _split_intervals_by_day(self, all_prices: list[dict]) -> tuple[dict, dict]:
"""Split intervals by day and calculate average price per day."""
intervals_by_day: dict = {}
avg_price_by_day: dict = {}
for price_data in all_prices:
dt = dt_util.parse_datetime(price_data["startsAt"])
if dt is None:
continue
date = dt.date()
intervals_by_day.setdefault(date, []).append(price_data)
for date, intervals in intervals_by_day.items():
avg_price_by_day[date] = sum(float(p["total"]) for p in intervals) / len(intervals)
return intervals_by_day, avg_price_by_day
def _calculate_reference_prices(self, intervals_by_day: dict, *, reverse_sort: bool) -> dict:
"""Calculate reference prices for each day."""
ref_prices: dict = {}
for date, intervals in intervals_by_day.items():
prices = [float(p["total"]) for p in intervals]
if reverse_sort is False:
ref_prices[date] = min(prices)
else:
ref_prices[date] = max(prices)
return ref_prices
def _build_periods(
self,
all_prices: list[dict],
price_context: dict,
*,
reverse_sort: bool,
) -> list[list[dict]]:
"""
Build periods, allowing periods to cross midnight (day boundary).
Strictly enforce flex threshold by percent diff, matching attribute calculation.
Additionally enforces:
1. Cap at daily average to prevent overlap between best and peak periods
2. Minimum distance from average to ensure meaningful price difference
Args:
all_prices: All price data points
price_context: Dict with ref_prices, avg_prices, flex, and min_distance_from_avg
reverse_sort: True for peak price (descending), False for best price (ascending)
Returns:
List of periods, each period is a list of interval dicts
"""
ref_prices = price_context["ref_prices"]
avg_prices = price_context["avg_prices"]
flex = price_context["flex"]
min_distance_from_avg = price_context["min_distance_from_avg"]
periods: list[list[dict]] = []
current_period: list[dict] = []
last_ref_date = None
for price_data in all_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
date = starts_at.date()
ref_price = ref_prices[date]
avg_price = avg_prices[date]
price = float(price_data["total"])
percent_diff = ((price - ref_price) / ref_price) * 100 if ref_price != 0 else 0.0
percent_diff = round(percent_diff, 2)
# For best price (flex >= 0): percent_diff <= flex*100 (prices up to flex% above reference)
# For peak price (flex <= 0): percent_diff >= flex*100 (prices down to |flex|% below reference)
in_flex = percent_diff <= flex * 100 if not reverse_sort else percent_diff >= flex * 100
# Cap at daily average to prevent overlap between best and peak periods
# Best price: only prices below average
# Peak price: only prices above average
within_avg_boundary = price <= avg_price if not reverse_sort else price >= avg_price
# Enforce minimum distance from average (in percentage terms)
# Best price: price must be at least min_distance_from_avg% below average
# Peak price: price must be at least min_distance_from_avg% above average
if not reverse_sort:
# Best price: price <= avg * (1 - min_distance_from_avg/100)
min_distance_threshold = avg_price * (1 - min_distance_from_avg / 100)
meets_min_distance = price <= min_distance_threshold
else:
# Peak price: price >= avg * (1 + min_distance_from_avg/100)
min_distance_threshold = avg_price * (1 + min_distance_from_avg / 100)
meets_min_distance = price >= min_distance_threshold
# Split period if day changes
if last_ref_date is not None and date != last_ref_date and current_period:
periods.append(current_period)
current_period = []
last_ref_date = date
if in_flex and within_avg_boundary and meets_min_distance:
current_period.append(
{
"interval_hour": starts_at.hour,
"interval_minute": starts_at.minute,
"interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}",
"price": price,
"interval_start": starts_at,
}
)
elif current_period:
periods.append(current_period)
current_period = []
if current_period:
periods.append(current_period)
return periods
def _filter_periods_by_min_length(self, periods: list[list[dict]], *, reverse_sort: bool) -> list[list[dict]]:
"""
Filter periods to only include those meeting the minimum length requirement.
Args:
periods: List of periods (each period is a list of interval dicts)
reverse_sort: True for peak price, False for best price
Returns:
Filtered list of periods that meet minimum length requirement
"""
options = self.coordinator.config_entry.options
data = self.coordinator.config_entry.data
# Use appropriate config based on sensor type
if reverse_sort: # Peak price
conf_key = CONF_PEAK_PRICE_MIN_PERIOD_LENGTH
default = DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH
else: # Best price
conf_key = CONF_BEST_PRICE_MIN_PERIOD_LENGTH
default = DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH
min_period_length = options.get(conf_key, data.get(conf_key, default))
# Convert minutes to number of 15-minute intervals
min_intervals = min_period_length // MINUTES_PER_INTERVAL
# Filter out periods that are too short
return [period for period in periods if len(period) >= min_intervals]
def _merge_adjacent_periods_at_midnight(self, periods: list[list[dict]]) -> list[list[dict]]:
"""
Merge adjacent periods that meet at midnight.
When two periods are detected separately for today and tomorrow due to different
daily average prices, but they are directly adjacent at midnight, merge them into
a single period for better user experience.
Args:
periods: List of periods (each period is a list of interval dicts)
Returns:
List of periods with adjacent midnight periods merged
"""
if not periods:
return periods
merged = []
i = 0
while i < len(periods):
current_period = periods[i]
# Check if there's a next period and if they meet at midnight
if i + 1 < len(periods):
next_period = periods[i + 1]
# Get the last interval of current period and first interval of next period
last_interval = current_period[-1]
first_interval = next_period[0]
last_start = last_interval.get("interval_start")
next_start = first_interval.get("interval_start")
# Check if they are adjacent (15 minutes apart) and cross midnight
if last_start and next_start:
time_diff = next_start - last_start
last_date = last_start.date()
next_date = next_start.date()
# If they are 15 minutes apart and on different days (crossing midnight)
if time_diff == timedelta(minutes=MINUTES_PER_INTERVAL) and next_date > last_date:
# Merge the two periods
merged_period = current_period + next_period
merged.append(merged_period)
i += 2 # Skip both periods as we've merged them
continue
# If no merge happened, just add the current period
merged.append(current_period)
i += 1
return merged
def _add_interval_ends(self, periods: list[list[dict]]) -> None:
"""Add interval_end to each interval using per-interval interval_length."""
for period in periods:
for idx, interval in enumerate(period):
if idx + 1 < len(period):
interval["interval_end"] = period[idx + 1]["interval_start"]
else:
interval["interval_end"] = interval["interval_start"] + timedelta(minutes=MINUTES_PER_INTERVAL)
def _filter_intervals_today_tomorrow(self, result: list[dict]) -> list[dict]:
"""Filter intervals to only include those from today and tomorrow."""
today = dt_util.now().date()
tomorrow = today + timedelta(days=1)
return [
interval
for interval in result
if interval.get("_interval_start") and today <= interval["_interval_start"].date() <= tomorrow
]
def _find_current_or_next_interval(self, filtered_result: list[dict]) -> dict | None:
"""Find the current or next interval from the filtered list."""
now = dt_util.now()
for interval in filtered_result:
start = interval.get("_interval_start")
end = interval.get("_interval_end")
if start and end and start <= now < end:
return interval.copy()
for interval in filtered_result:
start = interval.get("_interval_start")
if start and start > now:
return interval.copy()
return None
def _filter_periods_today_tomorrow(self, periods: list[list[dict]]) -> list[list[dict]]:
"""
Filter periods to include those that are currently active or in the future.
This includes periods that started yesterday but are still active now (crossing midnight),
as well as periods happening today or tomorrow. We don't want to show all past periods
from yesterday, only those that extend into today.
"""
now = dt_util.now()
today = now.date()
tomorrow = today + timedelta(days=1)
filtered = []
for period in periods:
if not period:
continue
# Get the period's end time (last interval's end)
last_interval = period[-1]
period_end = last_interval.get("interval_end")
# Get the period's start time (first interval's start)
first_interval = period[0]
period_start = first_interval.get("interval_start")
if not period_end or not period_start:
continue
# Include period if:
# 1. It's still active (ends in the future), OR
# 2. It has intervals today or tomorrow
if period_end > now or any(
interval.get("interval_start") and today <= interval["interval_start"].date() <= tomorrow
for interval in period
):
filtered.append(period)
return filtered
def _build_final_attributes(
self,
current_interval: dict | None,
periods_summary: list[dict],
filtered_result: list[dict],
) -> dict:
"""
Build the final attributes dictionary from period summary and current interval.
Combines period-level attributes with current interval-specific attributes,
ensuring price_diff reflects the current interval's position vs daily min/max.
"""
now = dt_util.now()
current_minute = (now.minute // 15) * 15
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
if current_interval and periods_summary:
# Find the current period in the summary based on period_start
current_period_start = current_interval.get("period_start")
current_period_summary = None
for period in periods_summary:
if period.get("start") == current_period_start:
current_period_summary = period
break
if current_period_summary:
# Copy all attributes from the period summary
attributes = {"timestamp": timestamp}
attributes.update(current_period_summary)
# Add interval-specific price_diff attributes (separate from period average)
# Shows the reference interval's position vs daily min/max:
# - If period is active: current 15-min interval vs daily min/max
# - If period hasn't started: first interval of the period vs daily min/max
# This value is what determines if an interval is part of a period (compared to flex setting)
if "price_diff_from_min" in current_interval:
attributes["interval_price_diff_from_daily_min"] = current_interval["price_diff_from_min"]
attributes["interval_price_diff_from_daily_min_%"] = current_interval.get("price_diff_from_min_%")
elif "price_diff_from_max" in current_interval:
attributes["interval_price_diff_from_daily_max"] = current_interval["price_diff_from_max"]
attributes["interval_price_diff_from_daily_max_%"] = current_interval.get("price_diff_from_max_%")
attributes["periods"] = periods_summary
attributes["intervals_count"] = len(filtered_result)
return attributes
# Fallback if current period not found in summary
return {
"timestamp": timestamp,
"periods": periods_summary,
"intervals_count": len(filtered_result),
}
# No periods found
return {
"timestamp": timestamp,
"periods": [],
"intervals_count": 0,
}
def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None:
"""
Get price interval attributes with caching to avoid expensive recalculation.
Uses a cache key based on coordinator data timestamp and config options.
Returns simplified attributes without the full intervals list to reduce payload.
"""
# Check cache first
cache_key = self._generate_cache_key(reverse_sort=reverse_sort)
if cache_key and cache_key == self._cache_key and self._period_cache:
return self._period_cache
# Cache miss - perform expensive calculation
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
all_prices.sort(key=lambda p: p["startsAt"])
intervals_by_day, avg_price_by_day = self._split_intervals_by_day(all_prices)
ref_prices = self._calculate_reference_prices(intervals_by_day, reverse_sort=reverse_sort)
flex = self._get_flex_option(
CONF_BEST_PRICE_FLEX if not reverse_sort else CONF_PEAK_PRICE_FLEX,
DEFAULT_BEST_PRICE_FLEX if not reverse_sort else DEFAULT_PEAK_PRICE_FLEX,
)
min_distance_from_avg = self._get_flex_option(
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG if not reverse_sort else CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG if not reverse_sort else DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
)
price_context = {
"ref_prices": ref_prices,
"avg_prices": avg_price_by_day,
"flex": flex,
"min_distance_from_avg": min_distance_from_avg,
}
periods = self._build_periods(all_prices, price_context, reverse_sort=reverse_sort)
periods = self._filter_periods_by_min_length(periods, reverse_sort=reverse_sort)
periods = self._merge_adjacent_periods_at_midnight(periods)
self._add_interval_ends(periods)
filtered_periods = self._filter_periods_today_tomorrow(periods)
# Simplified annotation - only annotate enough to find current interval and provide summary
result = self._annotate_period_intervals(
filtered_periods,
ref_prices,
avg_price_by_day,
)
filtered_result = self._filter_intervals_today_tomorrow(result)
current_interval = self._find_current_or_next_interval(filtered_result)
if not current_interval and filtered_result:
current_interval = filtered_result[0]
# Build periods array first
periods_summary = self._build_periods_summary(filtered_result) if filtered_result else []
# Build final attributes using helper method
attributes = self._build_final_attributes(current_interval, periods_summary, filtered_result)
# Cache the result (with internal fields intact)
self._cache_key = cache_key
self._period_cache = attributes
return attributes
def _build_periods_summary(self, intervals: list[dict]) -> list[dict]:
"""
Build a summary of periods with consistent attribute structure.
Returns a list of period summaries with the same attributes as top-level,
making the structure predictable and easy to use in automations.
"""
if not intervals:
return []
# Group intervals by period (they have the same period_start)
periods_dict: dict[str, list[dict]] = {}
for interval in intervals:
period_key = interval.get("period_start")
if period_key:
key_str = period_key.isoformat() if hasattr(period_key, "isoformat") else str(period_key)
if key_str not in periods_dict:
periods_dict[key_str] = []
periods_dict[key_str].append(interval)
# Build summary for each period with consistent attribute names
summaries = []
for period_intervals in periods_dict.values():
if not period_intervals:
continue
first = period_intervals[0]
prices = [i["price"] for i in period_intervals if "price" in i]
# Use same attribute names as top-level for consistency
summary = {
"start": first.get("period_start"),
"end": first.get("period_end"),
"hour": first.get("hour"),
"minute": first.get("minute"),
"time": first.get("time"),
"duration_minutes": first.get("duration_minutes"),
"periods_total": first.get("periods_total"),
"periods_remaining": first.get("periods_remaining"),
"period_position": first.get("period_position"),
"intervals_count": len(period_intervals),
"price_avg": round(sum(prices) / len(prices), 2) if prices else 0,
"price_min": round(min(prices), 2) if prices else 0,
"price_max": round(max(prices), 2) if prices else 0,
}
# Add price_diff attributes if present
self._add_price_diff_for_period(summary, period_intervals, first)
summaries.append(summary)
return summaries
def _add_price_diff_for_period(self, summary: dict, period_intervals: list[dict], first: dict) -> None:
"""
Add price difference attributes for the period based on sensor type.
Uses the reference price (min/max) from the start day of the period to ensure
consistent comparison, especially for periods spanning midnight.
Calculates how the period's average price compares to the daily min/max,
helping to explain why the period qualifies based on flex settings.
"""
# Determine sensor type and get the reference price from the first interval
# (which represents the start of the period and its day's reference value)
if "price_diff_from_min" in first:
# Best price sensor: calculate difference from the period's start day minimum
period_start = first.get("period_start")
if not period_start:
return
# Get all prices in minor units (cents/øre) from the period
prices = [i["price"] for i in period_intervals if "price" in i]
if not prices:
return
period_avg_price = sum(prices) / len(prices)
# Extract the reference min price from first interval's calculation
# We can back-calculate it from the first interval's price and diff
first_price_minor = first.get("price")
first_diff_minor = first.get("price_diff_from_min")
if first_price_minor is not None and first_diff_minor is not None:
ref_min_price = first_price_minor - first_diff_minor
period_diff = period_avg_price - ref_min_price
# Period average price difference from daily minimum
summary["period_price_diff_from_daily_min"] = round(period_diff, 2)
if ref_min_price != 0:
period_diff_pct = (period_diff / ref_min_price) * 100
summary["period_price_diff_from_daily_min_%"] = round(period_diff_pct, 2)
elif "price_diff_from_max" in first:
# Peak price sensor: calculate difference from the period's start day maximum
period_start = first.get("period_start")
if not period_start:
return
# Get all prices in minor units (cents/øre) from the period
prices = [i["price"] for i in period_intervals if "price" in i]
if not prices:
return
period_avg_price = sum(prices) / len(prices)
# Extract the reference max price from first interval's calculation
first_price_minor = first.get("price")
first_diff_minor = first.get("price_diff_from_max")
if first_price_minor is not None and first_diff_minor is not None:
ref_max_price = first_price_minor - first_diff_minor
period_diff = period_avg_price - ref_max_price
# Period average price difference from daily maximum
summary["period_price_diff_from_daily_max"] = round(period_diff, 2)
if ref_max_price != 0:
period_diff_pct = (period_diff / ref_max_price) * 100
summary["period_price_diff_from_daily_max_%"] = round(period_diff_pct, 2)
def _get_price_hours_attributes(self, *, attribute_name: str, reverse_sort: bool) -> dict | None:
"""Get price hours attributes."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
if not today_prices:
return None
prices = [
(
datetime.fromisoformat(price["startsAt"]).hour,
float(price["total"]),
)
for price in today_prices
]
# Sort by price (high to low for peak, low to high for best)
sorted_hours = sorted(prices, key=lambda x: x[1], reverse=reverse_sort)[:5]
return {attribute_name: [{"hour": hour, "price": price} for hour, price in sorted_hours]}
@property
def is_on(self) -> bool | None:
"""Return true if the binary_sensor is on."""
try:
if not self.coordinator.data or not self._state_getter:
return None
return self._state_getter()
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor state",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
@property
async def async_extra_state_attributes(self) -> dict | None:
"""Return additional state attributes asynchronously."""
try:
# Get the dynamic attributes if the getter is available
if not self.coordinator.data:
return None
attributes = {}
if self._attribute_getter:
dynamic_attrs = self._attribute_getter()
if dynamic_attrs:
# Copy and remove internal fields before exposing to user
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
attributes.update(clean_attrs)
# Add descriptions from the custom translations file
if self.entity_description.translation_key and self.hass is not None:
# Get user's language preference
language = self.hass.config.language if self.hass.config.language else "en"
# Add basic description
description = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"description",
)
if description:
attributes["description"] = description
# Check if extended descriptions are enabled in the config
extended_descriptions = self.coordinator.config_entry.options.get(
CONF_EXTENDED_DESCRIPTIONS,
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
)
# Add extended descriptions if enabled
if extended_descriptions:
# Add long description if available
long_desc = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"long_description",
)
if long_desc:
attributes["long_description"] = long_desc
# Add usage tips if available
usage_tips = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"usage_tips",
)
if usage_tips:
attributes["usage_tips"] = usage_tips
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor attributes",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
else:
return attributes if attributes else None
@property
def extra_state_attributes(self) -> dict | None:
"""Return additional state attributes synchronously."""
try:
# Start with dynamic attributes if available
if not self.coordinator.data:
return None
attributes = {}
if self._attribute_getter:
dynamic_attrs = self._attribute_getter()
if dynamic_attrs:
# Copy and remove internal fields before exposing to user
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
attributes.update(clean_attrs)
# Add descriptions from the cache (non-blocking)
if self.entity_description.translation_key and self.hass is not None:
# Get user's language preference
language = self.hass.config.language if self.hass.config.language else "en"
# Add basic description from cache
description = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"description",
)
if description:
attributes["description"] = description
# Check if extended descriptions are enabled in the config
extended_descriptions = self.coordinator.config_entry.options.get(
CONF_EXTENDED_DESCRIPTIONS,
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
)
# Add extended descriptions if enabled (from cache only)
if extended_descriptions:
# Add long description if available in cache
long_desc = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"long_description",
)
if long_desc:
attributes["long_description"] = long_desc
# Add usage tips if available in cache
usage_tips = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"usage_tips",
)
if usage_tips:
attributes["usage_tips"] = usage_tips
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor attributes",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
else:
return attributes if attributes else None
async def async_update(self) -> None:
"""Force a refresh when homeassistant.update_entity is called."""
await self.coordinator.async_request_refresh()