hass.tibber_prices/custom_components/tibber_prices/binary_sensor.py
Julian Pawlowski c4f36d04de feat(icons): add dynamic icons and colors for all sensor types
Implemented comprehensive dynamic icon and color system across all sensor types:

Price Sensors (5 sensors):
- Current/hour prices: Dynamic cash-family icons based on price level
  (cash-multiple/plus/cash/minus/remove)
- Next/previous: Static contextual icons (cash-fast, cash-refund, clock-fast)
- All have icon_color attribute for card-mod styling

Price Level Sensors (5 sensors):
- Dynamic gauge-family icons: gauge-empty → gauge-low → gauge → gauge-full → alert
- icon_color attribute with CSS variables (green/gray/orange/red)

Price Rating Sensors (5 sensors):
- Dynamic thumb-family icons: thumb-up → thumbs-up-down → thumb-down
- icon_color attribute for LOW/NORMAL/HIGH ratings

Volatility Sensors (4 sensors):
- Dynamic chart-family icons: chart-line-variant → chart-timeline-variant →
  chart-bar → chart-scatter-plot
- icon_color attribute for LOW/MODERATE/HIGH/VERY_HIGH levels

Trend Sensors (8 sensors):
- Dynamic trend icons: trending-up/down/neutral based on price movement
- icon_color attribute (red=rising, green=falling, gray=stable)

Binary Sensors (2 sensors):
- Best Price Period: piggy-bank (ON) / timer-sand or timer-sand-complete (OFF)
- Peak Price Period: alert-circle (ON) / shield-check or shield-check-outline (OFF)
- 6-hour lookahead window for intelligent OFF state icons
- icon_color attribute for all states

Technical implementation:
- PRICE_LEVEL_CASH_ICON_MAPPING in const.py for price sensor icons
- PRICE_SENSOR_ICON_MAPPING removed (static icons now in entity descriptions)
- Centralized icon logic in sensor.py icon property
- All color mappings use CSS variables for theme compatibility
- Binary sensors detect future periods within 6-hour window

Impact: Users now have visual indicators for all price-related states without
requiring card-mod. Optional card-mod styling available via icon_color attribute
for advanced customization. Icons update dynamically as price levels, ratings,
volatility, and trends change throughout the day.
2025-11-14 11:31:25 +00:00

664 lines
26 KiB
Python

"""Binary sensor platform for tibber_prices."""
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
BinarySensorEntityDescription,
)
from homeassistant.const import EntityCategory
from homeassistant.core import callback
from homeassistant.util import dt as dt_util
from .coordinator import TIME_SENSITIVE_ENTITY_KEYS
from .entity import TibberPricesEntity
if TYPE_CHECKING:
from collections.abc import Callable
from datetime import datetime
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .coordinator import TibberPricesDataUpdateCoordinator
from .data import TibberPricesConfigEntry
from .const import (
BINARY_SENSOR_COLOR_MAPPING,
BINARY_SENSOR_ICON_MAPPING,
CONF_EXTENDED_DESCRIPTIONS,
DEFAULT_EXTENDED_DESCRIPTIONS,
async_get_entity_description,
get_entity_description,
)
MINUTES_PER_INTERVAL = 15
MIN_TOMORROW_INTERVALS_15MIN = 96
# Look-ahead window for future period detection (hours)
# Icons will show "waiting" state if a period starts within this window
PERIOD_LOOKAHEAD_HOURS = 6
ENTITY_DESCRIPTIONS = (
BinarySensorEntityDescription(
key="peak_price_period",
translation_key="peak_price_period",
name="Peak Price Interval",
icon="mdi:clock-alert",
),
BinarySensorEntityDescription(
key="best_price_period",
translation_key="best_price_period",
name="Best Price Interval",
icon="mdi:clock-check",
),
BinarySensorEntityDescription(
key="connection",
translation_key="connection",
name="Tibber API Connection",
device_class=BinarySensorDeviceClass.CONNECTIVITY,
entity_category=EntityCategory.DIAGNOSTIC,
),
BinarySensorEntityDescription(
key="tomorrow_data_available",
translation_key="tomorrow_data_available",
name="Tomorrow's Data Available",
icon="mdi:calendar-check",
entity_category=EntityCategory.DIAGNOSTIC,
),
)
async def async_setup_entry(
_hass: HomeAssistant,
entry: TibberPricesConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up the binary_sensor platform."""
async_add_entities(
TibberPricesBinarySensor(
coordinator=entry.runtime_data.coordinator,
entity_description=entity_description,
)
for entity_description in ENTITY_DESCRIPTIONS
)
class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"""tibber_prices binary_sensor class."""
def __init__(
self,
coordinator: TibberPricesDataUpdateCoordinator,
entity_description: BinarySensorEntityDescription,
) -> None:
"""Initialize the binary_sensor class."""
super().__init__(coordinator)
self.entity_description = entity_description
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{entity_description.key}"
self._state_getter: Callable | None = self._get_state_getter()
self._attribute_getter: Callable | None = self._get_attribute_getter()
self._time_sensitive_remove_listener: Callable | None = None
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
# Register with coordinator for time-sensitive updates if applicable
if self.entity_description.key in TIME_SENSITIVE_ENTITY_KEYS:
self._time_sensitive_remove_listener = self.coordinator.async_add_time_sensitive_listener(
self._handle_time_sensitive_update
)
async def async_will_remove_from_hass(self) -> None:
"""When entity will be removed from hass."""
await super().async_will_remove_from_hass()
# Remove time-sensitive listener if registered
if self._time_sensitive_remove_listener:
self._time_sensitive_remove_listener()
self._time_sensitive_remove_listener = None
@callback
def _handle_time_sensitive_update(self) -> None:
"""Handle time-sensitive update from coordinator."""
self.async_write_ha_state()
def _get_state_getter(self) -> Callable | None:
"""Return the appropriate state getter method based on the sensor type."""
key = self.entity_description.key
if key == "peak_price_period":
return self._peak_price_state
if key == "best_price_period":
return self._best_price_state
if key == "connection":
return lambda: True if self.coordinator.data else None
if key == "tomorrow_data_available":
return self._tomorrow_data_available_state
return None
def _best_price_state(self) -> bool | None:
"""Return True if the current time is within a best price period."""
if not self.coordinator.data:
return None
attrs = self._get_price_intervals_attributes(reverse_sort=False)
if not attrs:
return False # Should not happen, but safety fallback
start = attrs.get("start")
end = attrs.get("end")
if not start or not end:
return False # No period found = sensor is off
now = dt_util.now()
return start <= now < end
def _peak_price_state(self) -> bool | None:
"""Return True if the current time is within a peak price period."""
if not self.coordinator.data:
return None
attrs = self._get_price_intervals_attributes(reverse_sort=True)
if not attrs:
return False # Should not happen, but safety fallback
start = attrs.get("start")
end = attrs.get("end")
if not start or not end:
return False # No period found = sensor is off
now = dt_util.now()
return start <= now < end
def _tomorrow_data_available_state(self) -> bool | None:
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count == MIN_TOMORROW_INTERVALS_15MIN:
return True
if interval_count == 0:
return False
return False
def _get_tomorrow_data_available_attributes(self) -> dict | None:
"""Return attributes for tomorrow_data_available binary sensor."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data.get("priceInfo", {})
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count == 0:
status = "none"
elif interval_count == MIN_TOMORROW_INTERVALS_15MIN:
status = "full"
else:
status = "partial"
return {
"intervals_available": interval_count,
"data_status": status,
}
def _get_attribute_getter(self) -> Callable | None:
"""Return the appropriate attribute getter method based on the sensor type."""
key = self.entity_description.key
if key == "peak_price_period":
return lambda: self._get_price_intervals_attributes(reverse_sort=True)
if key == "best_price_period":
return lambda: self._get_price_intervals_attributes(reverse_sort=False)
if key == "tomorrow_data_available":
return self._get_tomorrow_data_available_attributes
return None
def _get_precomputed_period_data(self, *, reverse_sort: bool) -> dict | None:
"""
Get precomputed period data from coordinator.
Returns lightweight period summaries (no full price data to avoid redundancy).
"""
if not self.coordinator.data:
return None
periods_data = self.coordinator.data.get("periods", {})
period_type = "peak_price" if reverse_sort else "best_price"
return periods_data.get(period_type)
def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None:
"""
Get price interval attributes using precomputed data from coordinator.
All data is already calculated in the coordinator - we just need to:
1. Get period summaries from coordinator (already filtered and fully calculated)
2. Add the current timestamp
3. Find current or next period based on time
Note: All calculations (filtering, aggregations, level/rating) are done in coordinator.
"""
# Get precomputed period summaries from coordinator (already filtered and complete!)
period_data = self._get_precomputed_period_data(reverse_sort=reverse_sort)
if not period_data:
return self._build_no_periods_result()
period_summaries = period_data.get("periods", [])
if not period_summaries:
return self._build_no_periods_result()
# Find current or next period based on current time
now = dt_util.now()
current_period = None
# First pass: find currently active period
for period in period_summaries:
start = period.get("start")
end = period.get("end")
if start and end and start <= now < end:
current_period = period
break
# Second pass: find next future period if none is active
if not current_period:
for period in period_summaries:
start = period.get("start")
if start and start > now:
current_period = period
break
# Build final attributes
return self._build_final_attributes_simple(current_period, period_summaries)
def _build_no_periods_result(self) -> dict:
"""
Build result when no periods exist (not filtered, just none available).
Returns:
A dict with empty periods and timestamp.
"""
# Calculate timestamp: current time rounded down to last quarter hour
now = dt_util.now()
current_minute = (now.minute // 15) * 15
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
return {
"timestamp": timestamp,
"start": None,
"end": None,
"periods": [],
}
def _add_time_attributes(self, attributes: dict, current_period: dict, timestamp: datetime) -> None:
"""Add time-related attributes (priority 1)."""
attributes["timestamp"] = timestamp
if "start" in current_period:
attributes["start"] = current_period["start"]
if "end" in current_period:
attributes["end"] = current_period["end"]
if "duration_minutes" in current_period:
attributes["duration_minutes"] = current_period["duration_minutes"]
def _add_decision_attributes(self, attributes: dict, current_period: dict) -> None:
"""Add core decision attributes (priority 2)."""
if "level" in current_period:
attributes["level"] = current_period["level"]
if "rating_level" in current_period:
attributes["rating_level"] = current_period["rating_level"]
if "rating_difference_%" in current_period:
attributes["rating_difference_%"] = current_period["rating_difference_%"]
def _add_price_attributes(self, attributes: dict, current_period: dict) -> None:
"""Add price statistics attributes (priority 3)."""
if "price_avg" in current_period:
attributes["price_avg"] = current_period["price_avg"]
if "price_min" in current_period:
attributes["price_min"] = current_period["price_min"]
if "price_max" in current_period:
attributes["price_max"] = current_period["price_max"]
if "price_spread" in current_period:
attributes["price_spread"] = current_period["price_spread"]
if "volatility" in current_period:
attributes["volatility"] = current_period["volatility"]
def _add_comparison_attributes(self, attributes: dict, current_period: dict) -> None:
"""Add price comparison attributes (priority 4)."""
if "period_price_diff_from_daily_min" in current_period:
attributes["period_price_diff_from_daily_min"] = current_period["period_price_diff_from_daily_min"]
if "period_price_diff_from_daily_min_%" in current_period:
attributes["period_price_diff_from_daily_min_%"] = current_period["period_price_diff_from_daily_min_%"]
def _add_detail_attributes(self, attributes: dict, current_period: dict) -> None:
"""Add detail information attributes (priority 5)."""
if "period_interval_count" in current_period:
attributes["period_interval_count"] = current_period["period_interval_count"]
if "period_position" in current_period:
attributes["period_position"] = current_period["period_position"]
if "periods_total" in current_period:
attributes["periods_total"] = current_period["periods_total"]
if "periods_remaining" in current_period:
attributes["periods_remaining"] = current_period["periods_remaining"]
def _add_relaxation_attributes(self, attributes: dict, current_period: dict) -> None:
"""
Add relaxation information attributes (priority 6).
Only adds relaxation attributes if the period was actually relaxed.
If relaxation_active is False or missing, no attributes are added.
"""
if current_period.get("relaxation_active"):
attributes["relaxation_active"] = True
if "relaxation_level" in current_period:
attributes["relaxation_level"] = current_period["relaxation_level"]
if "relaxation_threshold_original_%" in current_period:
attributes["relaxation_threshold_original_%"] = current_period["relaxation_threshold_original_%"]
if "relaxation_threshold_applied_%" in current_period:
attributes["relaxation_threshold_applied_%"] = current_period["relaxation_threshold_applied_%"]
def _build_final_attributes_simple(
self,
current_period: dict | None,
period_summaries: list[dict],
) -> dict:
"""
Build the final attributes dictionary from coordinator's period summaries.
All calculations are done in the coordinator - this just:
1. Adds the current timestamp (only thing calculated every 15min)
2. Uses the current/next period from summaries
3. Adds nested period summaries
Attributes are ordered following the documented priority:
1. Time information (timestamp, start, end, duration)
2. Core decision attributes (level, rating_level, rating_difference_%)
3. Price statistics (price_avg, price_min, price_max, price_spread, volatility)
4. Price differences (period_price_diff_from_daily_min, period_price_diff_from_daily_min_%)
5. Detail information (period_interval_count, period_position, periods_total, periods_remaining)
6. Relaxation information (relaxation_active, relaxation_level, relaxation_threshold_original_%,
relaxation_threshold_applied_%) - only if period was relaxed
7. Meta information (periods list)
Args:
current_period: The current or next period (already complete from coordinator)
period_summaries: All period summaries from coordinator
"""
now = dt_util.now()
current_minute = (now.minute // 15) * 15
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
if current_period:
# Build attributes in priority order using helper methods
attributes = {}
# 1. Time information
self._add_time_attributes(attributes, current_period, timestamp)
# 2. Core decision attributes
self._add_decision_attributes(attributes, current_period)
# 3. Price statistics
self._add_price_attributes(attributes, current_period)
# 4. Price differences
self._add_comparison_attributes(attributes, current_period)
# 5. Detail information
self._add_detail_attributes(attributes, current_period)
# 6. Relaxation information (only if period was relaxed)
self._add_relaxation_attributes(attributes, current_period)
# 7. Meta information (periods array)
attributes["periods"] = period_summaries
return attributes
# No current/next period found - return all periods with timestamp
return {
"timestamp": timestamp,
"periods": period_summaries,
}
@property
def is_on(self) -> bool | None:
"""Return true if the binary_sensor is on."""
try:
if not self.coordinator.data or not self._state_getter:
return None
return self._state_getter()
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor state",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
@property
def icon(self) -> str | None:
"""Return the icon based on binary sensor state."""
key = self.entity_description.key
# Dynamic icons for best/peak price period sensors
if key in BINARY_SENSOR_ICON_MAPPING:
if self.is_on:
# Sensor is ON - use "on" icon
icon = BINARY_SENSOR_ICON_MAPPING[key].get("on")
else:
# Sensor is OFF - check if future periods exist
has_future_periods = self._has_future_periods()
if has_future_periods:
icon = BINARY_SENSOR_ICON_MAPPING[key].get("off")
else:
icon = BINARY_SENSOR_ICON_MAPPING[key].get("off_no_future")
if icon:
return icon
# For all other sensors, use static icon from entity description
return self.entity_description.icon
def _has_future_periods(self) -> bool:
"""
Check if there are periods starting within the next 6 hours.
Returns True if any period starts between now and PERIOD_LOOKAHEAD_HOURS from now.
This provides a practical planning horizon instead of hard midnight cutoff.
"""
if not self._attribute_getter:
return False
attrs = self._attribute_getter()
if not attrs or "periods" not in attrs:
return False
now = dt_util.now()
horizon = now + timedelta(hours=PERIOD_LOOKAHEAD_HOURS)
periods = attrs.get("periods", [])
# Check if any period starts within the look-ahead window
for period in periods:
start_str = period.get("start")
if start_str:
# Parse datetime if it's a string, otherwise use as-is
start_time = dt_util.parse_datetime(start_str) if isinstance(start_str, str) else start_str
if start_time:
start_time_local = dt_util.as_local(start_time)
# Period starts in the future but within our horizon
if now < start_time_local <= horizon:
return True
return False
@property
async def async_extra_state_attributes(self) -> dict | None:
"""Return additional state attributes asynchronously."""
try:
# Get the dynamic attributes if the getter is available
if not self.coordinator.data:
return None
attributes = {}
if self._attribute_getter:
dynamic_attrs = self._attribute_getter()
if dynamic_attrs:
# Copy and remove internal fields before exposing to user
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
attributes.update(clean_attrs)
# Add icon_color for best/peak price period sensors
key = self.entity_description.key
if key in BINARY_SENSOR_COLOR_MAPPING:
state = "on" if self.is_on else "off"
color = BINARY_SENSOR_COLOR_MAPPING[key].get(state)
if color:
attributes["icon_color"] = color
# Add descriptions from the custom translations file
if self.entity_description.translation_key and self.hass is not None:
# Get user's language preference
language = self.hass.config.language if self.hass.config.language else "en"
# Add basic description
description = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"description",
)
if description:
attributes["description"] = description
# Check if extended descriptions are enabled in the config
extended_descriptions = self.coordinator.config_entry.options.get(
CONF_EXTENDED_DESCRIPTIONS,
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
)
# Add extended descriptions if enabled
if extended_descriptions:
# Add long description if available
long_desc = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"long_description",
)
if long_desc:
attributes["long_description"] = long_desc
# Add usage tips if available
usage_tips = await async_get_entity_description(
self.hass,
"binary_sensor",
self.entity_description.translation_key,
language,
"usage_tips",
)
if usage_tips:
attributes["usage_tips"] = usage_tips
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor attributes",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
else:
return attributes if attributes else None
@property
def extra_state_attributes(self) -> dict | None:
"""Return additional state attributes synchronously."""
try:
# Start with dynamic attributes if available
if not self.coordinator.data:
return None
attributes = {}
if self._attribute_getter:
dynamic_attrs = self._attribute_getter()
if dynamic_attrs:
# Copy and remove internal fields before exposing to user
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
attributes.update(clean_attrs)
# Add icon_color for best/peak price period sensors
key = self.entity_description.key
if key in BINARY_SENSOR_COLOR_MAPPING:
state = "on" if self.is_on else "off"
color = BINARY_SENSOR_COLOR_MAPPING[key].get(state)
if color:
attributes["icon_color"] = color
# Add descriptions from the cache (non-blocking)
if self.entity_description.translation_key and self.hass is not None:
# Get user's language preference
language = self.hass.config.language if self.hass.config.language else "en"
# Add basic description from cache
description = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"description",
)
if description:
attributes["description"] = description
# Check if extended descriptions are enabled in the config
extended_descriptions = self.coordinator.config_entry.options.get(
CONF_EXTENDED_DESCRIPTIONS,
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
)
# Add extended descriptions if enabled (from cache only)
if extended_descriptions:
# Add long description if available in cache
long_desc = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"long_description",
)
if long_desc:
attributes["long_description"] = long_desc
# Add usage tips if available in cache
usage_tips = get_entity_description(
"binary_sensor",
self.entity_description.translation_key,
language,
"usage_tips",
)
if usage_tips:
attributes["usage_tips"] = usage_tips
except (KeyError, ValueError, TypeError) as ex:
self.coordinator.logger.exception(
"Error getting binary sensor attributes",
extra={
"error": str(ex),
"entity": self.entity_description.key,
},
)
return None
else:
return attributes if attributes else None
async def async_update(self) -> None:
"""Force a refresh when homeassistant.update_entity is called."""
await self.coordinator.async_request_refresh()