mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-30 05:13:40 +00:00
refactor(binary_sensor): split into package matching sensor/ structure
Split binary_sensor.py (645 lines) into binary_sensor/ package with
4 modules following the established sensor/ pattern for consistency
and maintainability.
Package structure:
- binary_sensor/__init__.py (32 lines): Platform setup
- binary_sensor/definitions.py (46 lines): ENTITY_DESCRIPTIONS, constants
- binary_sensor/attributes.py (443 lines): Attribute builder functions
- binary_sensor/core.py (282 lines): TibberPricesBinarySensor class
Changes:
- Created binary_sensor/ package with __init__.py importing from .core
- Extracted ENTITY_DESCRIPTIONS and constants to definitions.py
- Moved 13 attribute builders to attributes.py (get_price_intervals_attributes,
build_async/sync_extra_state_attributes, add_* helpers)
- Moved TibberPricesBinarySensor class to core.py with state logic and
icon handling
- Used keyword-only parameters to satisfy Ruff PLR0913 (too many args)
- Applied absolute imports (custom_components.tibber_prices.*) in modules
All 4 binary sensors tested and working:
- peak_price_period
- best_price_period
- connection
- tomorrow_data_available
Documentation updated:
- AGENTS.md: Architecture Overview, Component Structure, Common Tasks
- binary-sensor-refactoring-plan.md: Marked ✅ COMPLETED with summary
Impact: Symmetric platform structure (sensor/ ↔ binary_sensor/). Easier
to add new binary sensors following documented pattern. No user-visible
changes.
This commit is contained in:
parent
78498a9aec
commit
efda22f7ad
6 changed files with 908 additions and 648 deletions
112
AGENTS.md
112
AGENTS.md
|
|
@ -4,8 +4,8 @@ This is a **Home Assistant custom component** for Tibber electricity price data,
|
||||||
|
|
||||||
## Documentation Metadata
|
## Documentation Metadata
|
||||||
|
|
||||||
- **Last Major Update**: 2025-11-17
|
- **Last Major Update**: 2025-11-15
|
||||||
- **Last Architecture Review**: 2025-11-17 (Module splitting refactoring completed - sensor.py split into sensor/ package with core.py, definitions.py, helpers.py, attributes.py. Created entity_utils/ package for shared icon/color/attribute logic. All phases complete.)
|
- **Last Architecture Review**: 2025-11-15 (Module splitting refactoring completed - sensor.py and binary_sensor.py split into packages with core.py, definitions.py, helpers.py, attributes.py. Created entity_utils/ package for shared icon/color/attribute logic. All phases complete.)
|
||||||
- **Documentation Status**: ✅ Current (verified against codebase)
|
- **Documentation Status**: ✅ Current (verified against codebase)
|
||||||
|
|
||||||
_Note: When proposing significant updates to this file, update the metadata above with the new date and brief description of changes._
|
_Note: When proposing significant updates to this file, update the metadata above with the new date and brief description of changes._
|
||||||
|
|
@ -235,7 +235,7 @@ After successful refactoring:
|
||||||
1. `TibberPricesApiClient` (`api.py`) queries Tibber's GraphQL API with `resolution:QUARTER_HOURLY` for user data and prices (yesterday/today/tomorrow - 192 intervals total)
|
1. `TibberPricesApiClient` (`api.py`) queries Tibber's GraphQL API with `resolution:QUARTER_HOURLY` for user data and prices (yesterday/today/tomorrow - 192 intervals total)
|
||||||
2. `TibberPricesDataUpdateCoordinator` (`coordinator.py`) orchestrates updates every 15 minutes, manages persistent storage via `Store`, and schedules quarter-hour entity refreshes
|
2. `TibberPricesDataUpdateCoordinator` (`coordinator.py`) orchestrates updates every 15 minutes, manages persistent storage via `Store`, and schedules quarter-hour entity refreshes
|
||||||
3. Price enrichment functions (`price_utils.py`, `average_utils.py`) calculate trailing/leading 24h averages, price differences, and rating levels for each 15-minute interval
|
3. Price enrichment functions (`price_utils.py`, `average_utils.py`) calculate trailing/leading 24h averages, price differences, and rating levels for each 15-minute interval
|
||||||
4. Entity platforms (`sensor/` package, `binary_sensor.py`) expose enriched data as Home Assistant entities
|
4. Entity platforms (`sensor/` package, `binary_sensor/` package) expose enriched data as Home Assistant entities
|
||||||
5. Custom services (`services.py`) provide API endpoints for integrations like ApexCharts
|
5. Custom services (`services.py`) provide API endpoints for integrations like ApexCharts
|
||||||
|
|
||||||
**Key Patterns:**
|
**Key Patterns:**
|
||||||
|
|
@ -333,7 +333,11 @@ custom_components/tibber_prices/
|
||||||
│ ├── definitions.py # ENTITY_DESCRIPTIONS
|
│ ├── definitions.py # ENTITY_DESCRIPTIONS
|
||||||
│ ├── helpers.py # Pure helper functions
|
│ ├── helpers.py # Pure helper functions
|
||||||
│ └── attributes.py # Attribute builders
|
│ └── attributes.py # Attribute builders
|
||||||
├── binary_sensor.py # Peak/best hour binary sensors
|
├── binary_sensor/ # Binary sensor platform (package)
|
||||||
|
│ ├── __init__.py # Platform setup (async_setup_entry)
|
||||||
|
│ ├── core.py # TibberPricesBinarySensor class
|
||||||
|
│ ├── definitions.py # ENTITY_DESCRIPTIONS, constants
|
||||||
|
│ └── attributes.py # Attribute builders
|
||||||
├── entity.py # Base TibberPricesEntity class
|
├── entity.py # Base TibberPricesEntity class
|
||||||
├── entity_utils/ # Shared entity helpers (both platforms)
|
├── entity_utils/ # Shared entity helpers (both platforms)
|
||||||
│ ├── __init__.py # Package exports
|
│ ├── __init__.py # Package exports
|
||||||
|
|
@ -1994,6 +1998,106 @@ The refactoring consolidated duplicate logic into unified methods in `sensor/cor
|
||||||
|
|
||||||
Legacy wrapper methods still exist for backward compatibility but will be removed in a future cleanup phase.
|
Legacy wrapper methods still exist for backward compatibility but will be removed in a future cleanup phase.
|
||||||
|
|
||||||
|
**Add a new binary sensor:**
|
||||||
|
|
||||||
|
After the binary_sensor.py refactoring (completed Nov 2025), binary sensors are organized similarly to the sensor/ package. Follow these steps:
|
||||||
|
|
||||||
|
1. **Add entity description** to `binary_sensor/definitions.py`:
|
||||||
|
|
||||||
|
- Add to `ENTITY_DESCRIPTIONS` tuple
|
||||||
|
- Define key, translation_key, name, icon, device_class
|
||||||
|
|
||||||
|
2. **Implement state logic** in `binary_sensor/core.py`:
|
||||||
|
|
||||||
|
- Add state property (e.g., `_my_feature_state`) returning bool
|
||||||
|
- Update `is_on` property to route to your state method
|
||||||
|
- Follow pattern: Check coordinator data availability, calculate state, return bool
|
||||||
|
|
||||||
|
3. **Add attribute builder** (if needed) in `binary_sensor/attributes.py`:
|
||||||
|
|
||||||
|
- Create `build_my_feature_attributes()` function
|
||||||
|
- Return dict with relevant attributes
|
||||||
|
- Update `build_async_extra_state_attributes()` or `build_sync_extra_state_attributes()` to call your builder
|
||||||
|
|
||||||
|
4. **Add translation keys**:
|
||||||
|
|
||||||
|
- `/translations/en.json` (entity name per HA schema)
|
||||||
|
- `/custom_translations/en.json` (description, long_description, usage_tips)
|
||||||
|
|
||||||
|
5. **Sync all language files** (de, nb, nl, sv)
|
||||||
|
|
||||||
|
**Example - Adding a "low price alert" binary sensor:**
|
||||||
|
|
||||||
|
```python
|
||||||
|
# 1. Add to ENTITY_DESCRIPTIONS in binary_sensor/definitions.py
|
||||||
|
BinarySensorEntityDescription(
|
||||||
|
key="low_price_alert",
|
||||||
|
translation_key="low_price_alert",
|
||||||
|
name="Low Price Alert",
|
||||||
|
icon="mdi:alert-circle",
|
||||||
|
device_class=BinarySensorDeviceClass.PROBLEM, # ON = problem (not low)
|
||||||
|
),
|
||||||
|
|
||||||
|
# 2. Add state property in binary_sensor/core.py
|
||||||
|
@property
|
||||||
|
def _low_price_alert_state(self) -> bool:
|
||||||
|
"""Return True if current price is NOT in low price range."""
|
||||||
|
if not self.coordinator.data or "priceInfo" not in self.coordinator.data:
|
||||||
|
return False
|
||||||
|
|
||||||
|
price_info = self.coordinator.data["priceInfo"]
|
||||||
|
today_prices = price_info.get("today", [])
|
||||||
|
if not today_prices:
|
||||||
|
return False
|
||||||
|
|
||||||
|
current_interval = today_prices[0] # Simplified - should find actual current
|
||||||
|
return current_interval.get("rating_level") != "LOW"
|
||||||
|
|
||||||
|
# 3. Update is_on property routing
|
||||||
|
@property
|
||||||
|
def is_on(self) -> bool:
|
||||||
|
"""Return sensor state."""
|
||||||
|
if self.entity_description.key == "low_price_alert":
|
||||||
|
return self._low_price_alert_state
|
||||||
|
# ... existing routing ...
|
||||||
|
|
||||||
|
# 4. Add attribute builder in binary_sensor/attributes.py (optional)
|
||||||
|
def build_low_price_alert_attributes(
|
||||||
|
coordinator: TibberPricesDataUpdateCoordinator,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Build attributes for low price alert sensor."""
|
||||||
|
if not coordinator.data or "priceInfo" not in coordinator.data:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
price_info = coordinator.data["priceInfo"]
|
||||||
|
current_price = price_info["today"][0].get("total", 0)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"current_price": current_price,
|
||||||
|
"threshold": 0.20, # Example threshold
|
||||||
|
}
|
||||||
|
|
||||||
|
# 5. Add translations (en.json)
|
||||||
|
{
|
||||||
|
"entity": {
|
||||||
|
"binary_sensor": {
|
||||||
|
"low_price_alert": {
|
||||||
|
"name": "Low Price Alert"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# 6. Add custom translations (custom_translations/en.json)
|
||||||
|
{
|
||||||
|
"binary_sensor": {
|
||||||
|
"low_price_alert": {
|
||||||
|
"description": "Alert when current price is NOT in low price range"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
**Modify price calculations:**
|
**Modify price calculations:**
|
||||||
Edit `price_utils.py` or `average_utils.py`. These are stateless pure functions operating on price lists.
|
Edit `price_utils.py` or `average_utils.py`. These are stateless pure functions operating on price lists.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,644 +0,0 @@
|
||||||
"""Binary sensor platform for tibber_prices."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from datetime import timedelta
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from homeassistant.components.binary_sensor import (
|
|
||||||
BinarySensorDeviceClass,
|
|
||||||
BinarySensorEntity,
|
|
||||||
BinarySensorEntityDescription,
|
|
||||||
)
|
|
||||||
from homeassistant.const import EntityCategory
|
|
||||||
from homeassistant.core import callback
|
|
||||||
from homeassistant.util import dt as dt_util
|
|
||||||
|
|
||||||
from .coordinator import TIME_SENSITIVE_ENTITY_KEYS
|
|
||||||
from .entity import TibberPricesEntity
|
|
||||||
from .entity_utils import add_icon_color_attribute, get_binary_sensor_icon
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from collections.abc import Callable
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from homeassistant.core import HomeAssistant
|
|
||||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
||||||
|
|
||||||
from .coordinator import TibberPricesDataUpdateCoordinator
|
|
||||||
from .data import TibberPricesConfigEntry
|
|
||||||
|
|
||||||
from .const import (
|
|
||||||
CONF_EXTENDED_DESCRIPTIONS,
|
|
||||||
DEFAULT_EXTENDED_DESCRIPTIONS,
|
|
||||||
async_get_entity_description,
|
|
||||||
get_entity_description,
|
|
||||||
)
|
|
||||||
|
|
||||||
MINUTES_PER_INTERVAL = 15
|
|
||||||
MIN_TOMORROW_INTERVALS_15MIN = 96
|
|
||||||
|
|
||||||
# Look-ahead window for future period detection (hours)
|
|
||||||
# Icons will show "waiting" state if a period starts within this window
|
|
||||||
PERIOD_LOOKAHEAD_HOURS = 6
|
|
||||||
|
|
||||||
ENTITY_DESCRIPTIONS = (
|
|
||||||
BinarySensorEntityDescription(
|
|
||||||
key="peak_price_period",
|
|
||||||
translation_key="peak_price_period",
|
|
||||||
name="Peak Price Interval",
|
|
||||||
icon="mdi:clock-alert",
|
|
||||||
),
|
|
||||||
BinarySensorEntityDescription(
|
|
||||||
key="best_price_period",
|
|
||||||
translation_key="best_price_period",
|
|
||||||
name="Best Price Interval",
|
|
||||||
icon="mdi:clock-check",
|
|
||||||
),
|
|
||||||
BinarySensorEntityDescription(
|
|
||||||
key="connection",
|
|
||||||
translation_key="connection",
|
|
||||||
name="Tibber API Connection",
|
|
||||||
device_class=BinarySensorDeviceClass.CONNECTIVITY,
|
|
||||||
entity_category=EntityCategory.DIAGNOSTIC,
|
|
||||||
),
|
|
||||||
BinarySensorEntityDescription(
|
|
||||||
key="tomorrow_data_available",
|
|
||||||
translation_key="tomorrow_data_available",
|
|
||||||
name="Tomorrow's Data Available",
|
|
||||||
icon="mdi:calendar-check",
|
|
||||||
entity_category=EntityCategory.DIAGNOSTIC,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_entry(
|
|
||||||
_hass: HomeAssistant,
|
|
||||||
entry: TibberPricesConfigEntry,
|
|
||||||
async_add_entities: AddEntitiesCallback,
|
|
||||||
) -> None:
|
|
||||||
"""Set up the binary_sensor platform."""
|
|
||||||
async_add_entities(
|
|
||||||
TibberPricesBinarySensor(
|
|
||||||
coordinator=entry.runtime_data.coordinator,
|
|
||||||
entity_description=entity_description,
|
|
||||||
)
|
|
||||||
for entity_description in ENTITY_DESCRIPTIONS
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
|
|
||||||
"""tibber_prices binary_sensor class."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
coordinator: TibberPricesDataUpdateCoordinator,
|
|
||||||
entity_description: BinarySensorEntityDescription,
|
|
||||||
) -> None:
|
|
||||||
"""Initialize the binary_sensor class."""
|
|
||||||
super().__init__(coordinator)
|
|
||||||
self.entity_description = entity_description
|
|
||||||
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{entity_description.key}"
|
|
||||||
self._state_getter: Callable | None = self._get_state_getter()
|
|
||||||
self._attribute_getter: Callable | None = self._get_attribute_getter()
|
|
||||||
self._time_sensitive_remove_listener: Callable | None = None
|
|
||||||
|
|
||||||
async def async_added_to_hass(self) -> None:
|
|
||||||
"""When entity is added to hass."""
|
|
||||||
await super().async_added_to_hass()
|
|
||||||
|
|
||||||
# Register with coordinator for time-sensitive updates if applicable
|
|
||||||
if self.entity_description.key in TIME_SENSITIVE_ENTITY_KEYS:
|
|
||||||
self._time_sensitive_remove_listener = self.coordinator.async_add_time_sensitive_listener(
|
|
||||||
self._handle_time_sensitive_update
|
|
||||||
)
|
|
||||||
|
|
||||||
async def async_will_remove_from_hass(self) -> None:
|
|
||||||
"""When entity will be removed from hass."""
|
|
||||||
await super().async_will_remove_from_hass()
|
|
||||||
|
|
||||||
# Remove time-sensitive listener if registered
|
|
||||||
if self._time_sensitive_remove_listener:
|
|
||||||
self._time_sensitive_remove_listener()
|
|
||||||
self._time_sensitive_remove_listener = None
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def _handle_time_sensitive_update(self) -> None:
|
|
||||||
"""Handle time-sensitive update from coordinator."""
|
|
||||||
self.async_write_ha_state()
|
|
||||||
|
|
||||||
def _get_state_getter(self) -> Callable | None:
|
|
||||||
"""Return the appropriate state getter method based on the sensor type."""
|
|
||||||
key = self.entity_description.key
|
|
||||||
|
|
||||||
if key == "peak_price_period":
|
|
||||||
return self._peak_price_state
|
|
||||||
if key == "best_price_period":
|
|
||||||
return self._best_price_state
|
|
||||||
if key == "connection":
|
|
||||||
return lambda: True if self.coordinator.data else None
|
|
||||||
if key == "tomorrow_data_available":
|
|
||||||
return self._tomorrow_data_available_state
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _best_price_state(self) -> bool | None:
|
|
||||||
"""Return True if the current time is within a best price period."""
|
|
||||||
if not self.coordinator.data:
|
|
||||||
return None
|
|
||||||
attrs = self._get_price_intervals_attributes(reverse_sort=False)
|
|
||||||
if not attrs:
|
|
||||||
return False # Should not happen, but safety fallback
|
|
||||||
start = attrs.get("start")
|
|
||||||
end = attrs.get("end")
|
|
||||||
if not start or not end:
|
|
||||||
return False # No period found = sensor is off
|
|
||||||
now = dt_util.now()
|
|
||||||
return start <= now < end
|
|
||||||
|
|
||||||
def _peak_price_state(self) -> bool | None:
|
|
||||||
"""Return True if the current time is within a peak price period."""
|
|
||||||
if not self.coordinator.data:
|
|
||||||
return None
|
|
||||||
attrs = self._get_price_intervals_attributes(reverse_sort=True)
|
|
||||||
if not attrs:
|
|
||||||
return False # Should not happen, but safety fallback
|
|
||||||
start = attrs.get("start")
|
|
||||||
end = attrs.get("end")
|
|
||||||
if not start or not end:
|
|
||||||
return False # No period found = sensor is off
|
|
||||||
now = dt_util.now()
|
|
||||||
return start <= now < end
|
|
||||||
|
|
||||||
def _tomorrow_data_available_state(self) -> bool | None:
|
|
||||||
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
|
|
||||||
if not self.coordinator.data:
|
|
||||||
return None
|
|
||||||
price_info = self.coordinator.data.get("priceInfo", {})
|
|
||||||
tomorrow_prices = price_info.get("tomorrow", [])
|
|
||||||
interval_count = len(tomorrow_prices)
|
|
||||||
if interval_count == MIN_TOMORROW_INTERVALS_15MIN:
|
|
||||||
return True
|
|
||||||
if interval_count == 0:
|
|
||||||
return False
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _get_tomorrow_data_available_attributes(self) -> dict | None:
|
|
||||||
"""Return attributes for tomorrow_data_available binary sensor."""
|
|
||||||
if not self.coordinator.data:
|
|
||||||
return None
|
|
||||||
price_info = self.coordinator.data.get("priceInfo", {})
|
|
||||||
tomorrow_prices = price_info.get("tomorrow", [])
|
|
||||||
interval_count = len(tomorrow_prices)
|
|
||||||
if interval_count == 0:
|
|
||||||
status = "none"
|
|
||||||
elif interval_count == MIN_TOMORROW_INTERVALS_15MIN:
|
|
||||||
status = "full"
|
|
||||||
else:
|
|
||||||
status = "partial"
|
|
||||||
return {
|
|
||||||
"intervals_available": interval_count,
|
|
||||||
"data_status": status,
|
|
||||||
}
|
|
||||||
|
|
||||||
def _get_attribute_getter(self) -> Callable | None:
|
|
||||||
"""Return the appropriate attribute getter method based on the sensor type."""
|
|
||||||
key = self.entity_description.key
|
|
||||||
|
|
||||||
if key == "peak_price_period":
|
|
||||||
return lambda: self._get_price_intervals_attributes(reverse_sort=True)
|
|
||||||
if key == "best_price_period":
|
|
||||||
return lambda: self._get_price_intervals_attributes(reverse_sort=False)
|
|
||||||
if key == "tomorrow_data_available":
|
|
||||||
return self._get_tomorrow_data_available_attributes
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _get_precomputed_period_data(self, *, reverse_sort: bool) -> dict | None:
|
|
||||||
"""
|
|
||||||
Get precomputed period data from coordinator.
|
|
||||||
|
|
||||||
Returns lightweight period summaries (no full price data to avoid redundancy).
|
|
||||||
"""
|
|
||||||
if not self.coordinator.data:
|
|
||||||
return None
|
|
||||||
|
|
||||||
periods_data = self.coordinator.data.get("periods", {})
|
|
||||||
period_type = "peak_price" if reverse_sort else "best_price"
|
|
||||||
return periods_data.get(period_type)
|
|
||||||
|
|
||||||
def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None:
|
|
||||||
"""
|
|
||||||
Get price interval attributes using precomputed data from coordinator.
|
|
||||||
|
|
||||||
All data is already calculated in the coordinator - we just need to:
|
|
||||||
1. Get period summaries from coordinator (already filtered and fully calculated)
|
|
||||||
2. Add the current timestamp
|
|
||||||
3. Find current or next period based on time
|
|
||||||
|
|
||||||
Note: All calculations (filtering, aggregations, level/rating) are done in coordinator.
|
|
||||||
"""
|
|
||||||
# Get precomputed period summaries from coordinator (already filtered and complete!)
|
|
||||||
period_data = self._get_precomputed_period_data(reverse_sort=reverse_sort)
|
|
||||||
if not period_data:
|
|
||||||
return self._build_no_periods_result()
|
|
||||||
|
|
||||||
period_summaries = period_data.get("periods", [])
|
|
||||||
if not period_summaries:
|
|
||||||
return self._build_no_periods_result()
|
|
||||||
|
|
||||||
# Find current or next period based on current time
|
|
||||||
now = dt_util.now()
|
|
||||||
current_period = None
|
|
||||||
|
|
||||||
# First pass: find currently active period
|
|
||||||
for period in period_summaries:
|
|
||||||
start = period.get("start")
|
|
||||||
end = period.get("end")
|
|
||||||
if start and end and start <= now < end:
|
|
||||||
current_period = period
|
|
||||||
break
|
|
||||||
|
|
||||||
# Second pass: find next future period if none is active
|
|
||||||
if not current_period:
|
|
||||||
for period in period_summaries:
|
|
||||||
start = period.get("start")
|
|
||||||
if start and start > now:
|
|
||||||
current_period = period
|
|
||||||
break
|
|
||||||
|
|
||||||
# Build final attributes
|
|
||||||
return self._build_final_attributes_simple(current_period, period_summaries)
|
|
||||||
|
|
||||||
def _build_no_periods_result(self) -> dict:
|
|
||||||
"""
|
|
||||||
Build result when no periods exist (not filtered, just none available).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A dict with empty periods and timestamp.
|
|
||||||
|
|
||||||
"""
|
|
||||||
# Calculate timestamp: current time rounded down to last quarter hour
|
|
||||||
now = dt_util.now()
|
|
||||||
current_minute = (now.minute // 15) * 15
|
|
||||||
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"timestamp": timestamp,
|
|
||||||
"start": None,
|
|
||||||
"end": None,
|
|
||||||
"periods": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
def _add_time_attributes(self, attributes: dict, current_period: dict, timestamp: datetime) -> None:
|
|
||||||
"""Add time-related attributes (priority 1)."""
|
|
||||||
attributes["timestamp"] = timestamp
|
|
||||||
if "start" in current_period:
|
|
||||||
attributes["start"] = current_period["start"]
|
|
||||||
if "end" in current_period:
|
|
||||||
attributes["end"] = current_period["end"]
|
|
||||||
if "duration_minutes" in current_period:
|
|
||||||
attributes["duration_minutes"] = current_period["duration_minutes"]
|
|
||||||
|
|
||||||
def _add_decision_attributes(self, attributes: dict, current_period: dict) -> None:
|
|
||||||
"""Add core decision attributes (priority 2)."""
|
|
||||||
if "level" in current_period:
|
|
||||||
attributes["level"] = current_period["level"]
|
|
||||||
if "rating_level" in current_period:
|
|
||||||
attributes["rating_level"] = current_period["rating_level"]
|
|
||||||
if "rating_difference_%" in current_period:
|
|
||||||
attributes["rating_difference_%"] = current_period["rating_difference_%"]
|
|
||||||
|
|
||||||
def _add_price_attributes(self, attributes: dict, current_period: dict) -> None:
|
|
||||||
"""Add price statistics attributes (priority 3)."""
|
|
||||||
if "price_avg" in current_period:
|
|
||||||
attributes["price_avg"] = current_period["price_avg"]
|
|
||||||
if "price_min" in current_period:
|
|
||||||
attributes["price_min"] = current_period["price_min"]
|
|
||||||
if "price_max" in current_period:
|
|
||||||
attributes["price_max"] = current_period["price_max"]
|
|
||||||
if "price_spread" in current_period:
|
|
||||||
attributes["price_spread"] = current_period["price_spread"]
|
|
||||||
if "volatility" in current_period:
|
|
||||||
attributes["volatility"] = current_period["volatility"]
|
|
||||||
|
|
||||||
def _add_comparison_attributes(self, attributes: dict, current_period: dict) -> None:
|
|
||||||
"""Add price comparison attributes (priority 4)."""
|
|
||||||
if "period_price_diff_from_daily_min" in current_period:
|
|
||||||
attributes["period_price_diff_from_daily_min"] = current_period["period_price_diff_from_daily_min"]
|
|
||||||
if "period_price_diff_from_daily_min_%" in current_period:
|
|
||||||
attributes["period_price_diff_from_daily_min_%"] = current_period["period_price_diff_from_daily_min_%"]
|
|
||||||
|
|
||||||
def _add_detail_attributes(self, attributes: dict, current_period: dict) -> None:
|
|
||||||
"""Add detail information attributes (priority 5)."""
|
|
||||||
if "period_interval_count" in current_period:
|
|
||||||
attributes["period_interval_count"] = current_period["period_interval_count"]
|
|
||||||
if "period_position" in current_period:
|
|
||||||
attributes["period_position"] = current_period["period_position"]
|
|
||||||
if "periods_total" in current_period:
|
|
||||||
attributes["periods_total"] = current_period["periods_total"]
|
|
||||||
if "periods_remaining" in current_period:
|
|
||||||
attributes["periods_remaining"] = current_period["periods_remaining"]
|
|
||||||
|
|
||||||
def _add_relaxation_attributes(self, attributes: dict, current_period: dict) -> None:
|
|
||||||
"""
|
|
||||||
Add relaxation information attributes (priority 6).
|
|
||||||
|
|
||||||
Only adds relaxation attributes if the period was actually relaxed.
|
|
||||||
If relaxation_active is False or missing, no attributes are added.
|
|
||||||
"""
|
|
||||||
if current_period.get("relaxation_active"):
|
|
||||||
attributes["relaxation_active"] = True
|
|
||||||
if "relaxation_level" in current_period:
|
|
||||||
attributes["relaxation_level"] = current_period["relaxation_level"]
|
|
||||||
if "relaxation_threshold_original_%" in current_period:
|
|
||||||
attributes["relaxation_threshold_original_%"] = current_period["relaxation_threshold_original_%"]
|
|
||||||
if "relaxation_threshold_applied_%" in current_period:
|
|
||||||
attributes["relaxation_threshold_applied_%"] = current_period["relaxation_threshold_applied_%"]
|
|
||||||
|
|
||||||
def _build_final_attributes_simple(
|
|
||||||
self,
|
|
||||||
current_period: dict | None,
|
|
||||||
period_summaries: list[dict],
|
|
||||||
) -> dict:
|
|
||||||
"""
|
|
||||||
Build the final attributes dictionary from coordinator's period summaries.
|
|
||||||
|
|
||||||
All calculations are done in the coordinator - this just:
|
|
||||||
1. Adds the current timestamp (only thing calculated every 15min)
|
|
||||||
2. Uses the current/next period from summaries
|
|
||||||
3. Adds nested period summaries
|
|
||||||
|
|
||||||
Attributes are ordered following the documented priority:
|
|
||||||
1. Time information (timestamp, start, end, duration)
|
|
||||||
2. Core decision attributes (level, rating_level, rating_difference_%)
|
|
||||||
3. Price statistics (price_avg, price_min, price_max, price_spread, volatility)
|
|
||||||
4. Price differences (period_price_diff_from_daily_min, period_price_diff_from_daily_min_%)
|
|
||||||
5. Detail information (period_interval_count, period_position, periods_total, periods_remaining)
|
|
||||||
6. Relaxation information (relaxation_active, relaxation_level, relaxation_threshold_original_%,
|
|
||||||
relaxation_threshold_applied_%) - only if period was relaxed
|
|
||||||
7. Meta information (periods list)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
current_period: The current or next period (already complete from coordinator)
|
|
||||||
period_summaries: All period summaries from coordinator
|
|
||||||
|
|
||||||
"""
|
|
||||||
now = dt_util.now()
|
|
||||||
current_minute = (now.minute // 15) * 15
|
|
||||||
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
|
|
||||||
|
|
||||||
if current_period:
|
|
||||||
# Build attributes in priority order using helper methods
|
|
||||||
attributes = {}
|
|
||||||
|
|
||||||
# 1. Time information
|
|
||||||
self._add_time_attributes(attributes, current_period, timestamp)
|
|
||||||
|
|
||||||
# 2. Core decision attributes
|
|
||||||
self._add_decision_attributes(attributes, current_period)
|
|
||||||
|
|
||||||
# 3. Price statistics
|
|
||||||
self._add_price_attributes(attributes, current_period)
|
|
||||||
|
|
||||||
# 4. Price differences
|
|
||||||
self._add_comparison_attributes(attributes, current_period)
|
|
||||||
|
|
||||||
# 5. Detail information
|
|
||||||
self._add_detail_attributes(attributes, current_period)
|
|
||||||
|
|
||||||
# 6. Relaxation information (only if period was relaxed)
|
|
||||||
self._add_relaxation_attributes(attributes, current_period)
|
|
||||||
|
|
||||||
# 7. Meta information (periods array)
|
|
||||||
attributes["periods"] = period_summaries
|
|
||||||
|
|
||||||
return attributes
|
|
||||||
|
|
||||||
# No current/next period found - return all periods with timestamp
|
|
||||||
return {
|
|
||||||
"timestamp": timestamp,
|
|
||||||
"periods": period_summaries,
|
|
||||||
}
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_on(self) -> bool | None:
|
|
||||||
"""Return true if the binary_sensor is on."""
|
|
||||||
try:
|
|
||||||
if not self.coordinator.data or not self._state_getter:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return self._state_getter()
|
|
||||||
|
|
||||||
except (KeyError, ValueError, TypeError) as ex:
|
|
||||||
self.coordinator.logger.exception(
|
|
||||||
"Error getting binary sensor state",
|
|
||||||
extra={
|
|
||||||
"error": str(ex),
|
|
||||||
"entity": self.entity_description.key,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def icon(self) -> str | None:
|
|
||||||
"""Return the icon based on binary sensor state."""
|
|
||||||
key = self.entity_description.key
|
|
||||||
|
|
||||||
# Use shared icon utility
|
|
||||||
icon = get_binary_sensor_icon(
|
|
||||||
key,
|
|
||||||
is_on=self.is_on,
|
|
||||||
has_future_periods_callback=self._has_future_periods,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fall back to static icon from entity description
|
|
||||||
return icon or self.entity_description.icon
|
|
||||||
|
|
||||||
def _has_future_periods(self) -> bool:
|
|
||||||
"""
|
|
||||||
Check if there are periods starting within the next 6 hours.
|
|
||||||
|
|
||||||
Returns True if any period starts between now and PERIOD_LOOKAHEAD_HOURS from now.
|
|
||||||
This provides a practical planning horizon instead of hard midnight cutoff.
|
|
||||||
"""
|
|
||||||
if not self._attribute_getter:
|
|
||||||
return False
|
|
||||||
|
|
||||||
attrs = self._attribute_getter()
|
|
||||||
if not attrs or "periods" not in attrs:
|
|
||||||
return False
|
|
||||||
|
|
||||||
now = dt_util.now()
|
|
||||||
horizon = now + timedelta(hours=PERIOD_LOOKAHEAD_HOURS)
|
|
||||||
periods = attrs.get("periods", [])
|
|
||||||
|
|
||||||
# Check if any period starts within the look-ahead window
|
|
||||||
for period in periods:
|
|
||||||
start_str = period.get("start")
|
|
||||||
if start_str:
|
|
||||||
# Parse datetime if it's a string, otherwise use as-is
|
|
||||||
start_time = dt_util.parse_datetime(start_str) if isinstance(start_str, str) else start_str
|
|
||||||
|
|
||||||
if start_time:
|
|
||||||
start_time_local = dt_util.as_local(start_time)
|
|
||||||
# Period starts in the future but within our horizon
|
|
||||||
if now < start_time_local <= horizon:
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
@property
|
|
||||||
async def async_extra_state_attributes(self) -> dict | None:
|
|
||||||
"""Return additional state attributes asynchronously."""
|
|
||||||
try:
|
|
||||||
# Get the dynamic attributes if the getter is available
|
|
||||||
if not self.coordinator.data:
|
|
||||||
return None
|
|
||||||
|
|
||||||
attributes = {}
|
|
||||||
if self._attribute_getter:
|
|
||||||
dynamic_attrs = self._attribute_getter()
|
|
||||||
if dynamic_attrs:
|
|
||||||
# Copy and remove internal fields before exposing to user
|
|
||||||
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
|
|
||||||
attributes.update(clean_attrs)
|
|
||||||
|
|
||||||
# Add icon_color for best/peak price period sensors using shared utility
|
|
||||||
add_icon_color_attribute(attributes, self.entity_description.key, is_on=self.is_on)
|
|
||||||
|
|
||||||
# Add description from the custom translations file
|
|
||||||
if self.entity_description.translation_key and self.hass is not None:
|
|
||||||
# Get user's language preference
|
|
||||||
language = self.hass.config.language if self.hass.config.language else "en"
|
|
||||||
|
|
||||||
# Add basic description
|
|
||||||
description = await async_get_entity_description(
|
|
||||||
self.hass,
|
|
||||||
"binary_sensor",
|
|
||||||
self.entity_description.translation_key,
|
|
||||||
language,
|
|
||||||
"description",
|
|
||||||
)
|
|
||||||
if description:
|
|
||||||
attributes["description"] = description
|
|
||||||
|
|
||||||
# Check if extended descriptions are enabled in the config
|
|
||||||
extended_descriptions = self.coordinator.config_entry.options.get(
|
|
||||||
CONF_EXTENDED_DESCRIPTIONS,
|
|
||||||
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add extended descriptions if enabled
|
|
||||||
if extended_descriptions:
|
|
||||||
# Add long description if available
|
|
||||||
long_desc = await async_get_entity_description(
|
|
||||||
self.hass,
|
|
||||||
"binary_sensor",
|
|
||||||
self.entity_description.translation_key,
|
|
||||||
language,
|
|
||||||
"long_description",
|
|
||||||
)
|
|
||||||
if long_desc:
|
|
||||||
attributes["long_description"] = long_desc
|
|
||||||
|
|
||||||
# Add usage tips if available
|
|
||||||
usage_tips = await async_get_entity_description(
|
|
||||||
self.hass,
|
|
||||||
"binary_sensor",
|
|
||||||
self.entity_description.translation_key,
|
|
||||||
language,
|
|
||||||
"usage_tips",
|
|
||||||
)
|
|
||||||
if usage_tips:
|
|
||||||
attributes["usage_tips"] = usage_tips
|
|
||||||
|
|
||||||
except (KeyError, ValueError, TypeError) as ex:
|
|
||||||
self.coordinator.logger.exception(
|
|
||||||
"Error getting binary sensor attributes",
|
|
||||||
extra={
|
|
||||||
"error": str(ex),
|
|
||||||
"entity": self.entity_description.key,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return attributes if attributes else None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def extra_state_attributes(self) -> dict | None:
|
|
||||||
"""Return additional state attributes synchronously."""
|
|
||||||
try:
|
|
||||||
# Start with dynamic attributes if available
|
|
||||||
if not self.coordinator.data:
|
|
||||||
return None
|
|
||||||
|
|
||||||
attributes = {}
|
|
||||||
if self._attribute_getter:
|
|
||||||
dynamic_attrs = self._attribute_getter()
|
|
||||||
if dynamic_attrs:
|
|
||||||
# Copy and remove internal fields before exposing to user
|
|
||||||
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
|
|
||||||
attributes.update(clean_attrs)
|
|
||||||
|
|
||||||
# Add icon_color for best/peak price period sensors using shared utility
|
|
||||||
add_icon_color_attribute(attributes, self.entity_description.key, is_on=self.is_on)
|
|
||||||
|
|
||||||
# Add descriptions from the cache (non-blocking)
|
|
||||||
if self.entity_description.translation_key and self.hass is not None:
|
|
||||||
# Get user's language preference
|
|
||||||
language = self.hass.config.language if self.hass.config.language else "en"
|
|
||||||
|
|
||||||
# Add basic description from cache
|
|
||||||
description = get_entity_description(
|
|
||||||
"binary_sensor",
|
|
||||||
self.entity_description.translation_key,
|
|
||||||
language,
|
|
||||||
"description",
|
|
||||||
)
|
|
||||||
if description:
|
|
||||||
attributes["description"] = description
|
|
||||||
|
|
||||||
# Check if extended descriptions are enabled in the config
|
|
||||||
extended_descriptions = self.coordinator.config_entry.options.get(
|
|
||||||
CONF_EXTENDED_DESCRIPTIONS,
|
|
||||||
self.coordinator.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add extended descriptions if enabled (from cache only)
|
|
||||||
if extended_descriptions:
|
|
||||||
# Add long description if available in cache
|
|
||||||
long_desc = get_entity_description(
|
|
||||||
"binary_sensor",
|
|
||||||
self.entity_description.translation_key,
|
|
||||||
language,
|
|
||||||
"long_description",
|
|
||||||
)
|
|
||||||
if long_desc:
|
|
||||||
attributes["long_description"] = long_desc
|
|
||||||
|
|
||||||
# Add usage tips if available in cache
|
|
||||||
usage_tips = get_entity_description(
|
|
||||||
"binary_sensor",
|
|
||||||
self.entity_description.translation_key,
|
|
||||||
language,
|
|
||||||
"usage_tips",
|
|
||||||
)
|
|
||||||
if usage_tips:
|
|
||||||
attributes["usage_tips"] = usage_tips
|
|
||||||
|
|
||||||
except (KeyError, ValueError, TypeError) as ex:
|
|
||||||
self.coordinator.logger.exception(
|
|
||||||
"Error getting binary sensor attributes",
|
|
||||||
extra={
|
|
||||||
"error": str(ex),
|
|
||||||
"entity": self.entity_description.key,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return attributes if attributes else None
|
|
||||||
|
|
||||||
async def async_update(self) -> None:
|
|
||||||
"""Force a refresh when homeassistant.update_entity is called."""
|
|
||||||
await self.coordinator.async_request_refresh()
|
|
||||||
28
custom_components/tibber_prices/binary_sensor/__init__.py
Normal file
28
custom_components/tibber_prices/binary_sensor/__init__.py
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
"""Binary sensor platform for tibber_prices."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from .core import TibberPricesBinarySensor
|
||||||
|
from .definitions import ENTITY_DESCRIPTIONS
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from custom_components.tibber_prices.data import TibberPricesConfigEntry
|
||||||
|
from homeassistant.core import HomeAssistant
|
||||||
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||||
|
|
||||||
|
|
||||||
|
async def async_setup_entry(
|
||||||
|
_hass: HomeAssistant,
|
||||||
|
entry: TibberPricesConfigEntry,
|
||||||
|
async_add_entities: AddEntitiesCallback,
|
||||||
|
) -> None:
|
||||||
|
"""Set up Tibber Prices binary sensor based on a config entry."""
|
||||||
|
async_add_entities(
|
||||||
|
TibberPricesBinarySensor(
|
||||||
|
coordinator=entry.runtime_data.coordinator,
|
||||||
|
entity_description=entity_description,
|
||||||
|
)
|
||||||
|
for entity_description in ENTITY_DESCRIPTIONS
|
||||||
|
)
|
||||||
443
custom_components/tibber_prices/binary_sensor/attributes.py
Normal file
443
custom_components/tibber_prices/binary_sensor/attributes.py
Normal file
|
|
@ -0,0 +1,443 @@
|
||||||
|
"""Attribute builders for binary sensors."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from custom_components.tibber_prices.const import (
|
||||||
|
CONF_EXTENDED_DESCRIPTIONS,
|
||||||
|
DEFAULT_EXTENDED_DESCRIPTIONS,
|
||||||
|
async_get_entity_description,
|
||||||
|
get_entity_description,
|
||||||
|
)
|
||||||
|
from custom_components.tibber_prices.entity_utils import add_icon_color_attribute
|
||||||
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from custom_components.tibber_prices.data import TibberPricesConfigEntry
|
||||||
|
from homeassistant.core import HomeAssistant
|
||||||
|
|
||||||
|
from .definitions import MIN_TOMORROW_INTERVALS_15MIN
|
||||||
|
|
||||||
|
|
||||||
|
def get_tomorrow_data_available_attributes(coordinator_data: dict) -> dict | None:
|
||||||
|
"""
|
||||||
|
Build attributes for tomorrow_data_available sensor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
coordinator_data: Coordinator data dict
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Attributes dict with intervals_available and data_status
|
||||||
|
|
||||||
|
"""
|
||||||
|
if not coordinator_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
price_info = coordinator_data.get("priceInfo", {})
|
||||||
|
tomorrow_prices = price_info.get("tomorrow", [])
|
||||||
|
interval_count = len(tomorrow_prices)
|
||||||
|
|
||||||
|
if interval_count == 0:
|
||||||
|
status = "none"
|
||||||
|
elif interval_count == MIN_TOMORROW_INTERVALS_15MIN:
|
||||||
|
status = "full"
|
||||||
|
else:
|
||||||
|
status = "partial"
|
||||||
|
|
||||||
|
return {
|
||||||
|
"intervals_available": interval_count,
|
||||||
|
"data_status": status,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_price_intervals_attributes(
|
||||||
|
coordinator_data: dict,
|
||||||
|
*,
|
||||||
|
reverse_sort: bool,
|
||||||
|
) -> dict | None:
|
||||||
|
"""
|
||||||
|
Build attributes for period-based sensors (best/peak price).
|
||||||
|
|
||||||
|
All data is already calculated in the coordinator - we just need to:
|
||||||
|
1. Get period summaries from coordinator (already filtered and fully calculated)
|
||||||
|
2. Add the current timestamp
|
||||||
|
3. Find current or next period based on time
|
||||||
|
|
||||||
|
Args:
|
||||||
|
coordinator_data: Coordinator data dict
|
||||||
|
reverse_sort: True for peak_price (highest first), False for best_price (lowest first)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Attributes dict with current/next period and all periods list
|
||||||
|
|
||||||
|
"""
|
||||||
|
if not coordinator_data:
|
||||||
|
return build_no_periods_result()
|
||||||
|
|
||||||
|
# Get precomputed period summaries from coordinator
|
||||||
|
periods_data = coordinator_data.get("periods", {})
|
||||||
|
period_type = "peak_price" if reverse_sort else "best_price"
|
||||||
|
period_data = periods_data.get(period_type)
|
||||||
|
|
||||||
|
if not period_data:
|
||||||
|
return build_no_periods_result()
|
||||||
|
|
||||||
|
period_summaries = period_data.get("periods", [])
|
||||||
|
if not period_summaries:
|
||||||
|
return build_no_periods_result()
|
||||||
|
|
||||||
|
# Find current or next period based on current time
|
||||||
|
now = dt_util.now()
|
||||||
|
current_period = None
|
||||||
|
|
||||||
|
# First pass: find currently active period
|
||||||
|
for period in period_summaries:
|
||||||
|
start = period.get("start")
|
||||||
|
end = period.get("end")
|
||||||
|
if start and end and start <= now < end:
|
||||||
|
current_period = period
|
||||||
|
break
|
||||||
|
|
||||||
|
# Second pass: find next future period if none is active
|
||||||
|
if not current_period:
|
||||||
|
for period in period_summaries:
|
||||||
|
start = period.get("start")
|
||||||
|
if start and start > now:
|
||||||
|
current_period = period
|
||||||
|
break
|
||||||
|
|
||||||
|
# Build final attributes
|
||||||
|
return build_final_attributes_simple(current_period, period_summaries)
|
||||||
|
|
||||||
|
|
||||||
|
def build_no_periods_result() -> dict:
|
||||||
|
"""
|
||||||
|
Build result when no periods exist (not filtered, just none available).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A dict with empty periods and timestamp.
|
||||||
|
|
||||||
|
"""
|
||||||
|
# Calculate timestamp: current time rounded down to last quarter hour
|
||||||
|
now = dt_util.now()
|
||||||
|
current_minute = (now.minute // 15) * 15
|
||||||
|
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"timestamp": timestamp,
|
||||||
|
"start": None,
|
||||||
|
"end": None,
|
||||||
|
"periods": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def add_time_attributes(attributes: dict, current_period: dict, timestamp: datetime) -> None:
|
||||||
|
"""Add time-related attributes (priority 1)."""
|
||||||
|
attributes["timestamp"] = timestamp
|
||||||
|
if "start" in current_period:
|
||||||
|
attributes["start"] = current_period["start"]
|
||||||
|
if "end" in current_period:
|
||||||
|
attributes["end"] = current_period["end"]
|
||||||
|
if "duration_minutes" in current_period:
|
||||||
|
attributes["duration_minutes"] = current_period["duration_minutes"]
|
||||||
|
|
||||||
|
|
||||||
|
def add_decision_attributes(attributes: dict, current_period: dict) -> None:
|
||||||
|
"""Add core decision attributes (priority 2)."""
|
||||||
|
if "level" in current_period:
|
||||||
|
attributes["level"] = current_period["level"]
|
||||||
|
if "rating_level" in current_period:
|
||||||
|
attributes["rating_level"] = current_period["rating_level"]
|
||||||
|
if "rating_difference_%" in current_period:
|
||||||
|
attributes["rating_difference_%"] = current_period["rating_difference_%"]
|
||||||
|
|
||||||
|
|
||||||
|
def add_price_attributes(attributes: dict, current_period: dict) -> None:
|
||||||
|
"""Add price statistics attributes (priority 3)."""
|
||||||
|
if "price_avg" in current_period:
|
||||||
|
attributes["price_avg"] = current_period["price_avg"]
|
||||||
|
if "price_min" in current_period:
|
||||||
|
attributes["price_min"] = current_period["price_min"]
|
||||||
|
if "price_max" in current_period:
|
||||||
|
attributes["price_max"] = current_period["price_max"]
|
||||||
|
if "price_spread" in current_period:
|
||||||
|
attributes["price_spread"] = current_period["price_spread"]
|
||||||
|
if "volatility" in current_period:
|
||||||
|
attributes["volatility"] = current_period["volatility"]
|
||||||
|
|
||||||
|
|
||||||
|
def add_comparison_attributes(attributes: dict, current_period: dict) -> None:
|
||||||
|
"""Add price comparison attributes (priority 4)."""
|
||||||
|
if "period_price_diff_from_daily_min" in current_period:
|
||||||
|
attributes["period_price_diff_from_daily_min"] = current_period["period_price_diff_from_daily_min"]
|
||||||
|
if "period_price_diff_from_daily_min_%" in current_period:
|
||||||
|
attributes["period_price_diff_from_daily_min_%"] = current_period["period_price_diff_from_daily_min_%"]
|
||||||
|
|
||||||
|
|
||||||
|
def add_detail_attributes(attributes: dict, current_period: dict) -> None:
|
||||||
|
"""Add detail information attributes (priority 5)."""
|
||||||
|
if "period_interval_count" in current_period:
|
||||||
|
attributes["period_interval_count"] = current_period["period_interval_count"]
|
||||||
|
if "period_position" in current_period:
|
||||||
|
attributes["period_position"] = current_period["period_position"]
|
||||||
|
if "periods_total" in current_period:
|
||||||
|
attributes["periods_total"] = current_period["periods_total"]
|
||||||
|
if "periods_remaining" in current_period:
|
||||||
|
attributes["periods_remaining"] = current_period["periods_remaining"]
|
||||||
|
|
||||||
|
|
||||||
|
def add_relaxation_attributes(attributes: dict, current_period: dict) -> None:
|
||||||
|
"""
|
||||||
|
Add relaxation information attributes (priority 6).
|
||||||
|
|
||||||
|
Only adds relaxation attributes if the period was actually relaxed.
|
||||||
|
If relaxation_active is False or missing, no attributes are added.
|
||||||
|
"""
|
||||||
|
if current_period.get("relaxation_active"):
|
||||||
|
attributes["relaxation_active"] = True
|
||||||
|
if "relaxation_level" in current_period:
|
||||||
|
attributes["relaxation_level"] = current_period["relaxation_level"]
|
||||||
|
if "relaxation_threshold_original_%" in current_period:
|
||||||
|
attributes["relaxation_threshold_original_%"] = current_period["relaxation_threshold_original_%"]
|
||||||
|
if "relaxation_threshold_applied_%" in current_period:
|
||||||
|
attributes["relaxation_threshold_applied_%"] = current_period["relaxation_threshold_applied_%"]
|
||||||
|
|
||||||
|
|
||||||
|
def build_final_attributes_simple(
|
||||||
|
current_period: dict | None,
|
||||||
|
period_summaries: list[dict],
|
||||||
|
) -> dict:
|
||||||
|
"""
|
||||||
|
Build the final attributes dictionary from coordinator's period summaries.
|
||||||
|
|
||||||
|
All calculations are done in the coordinator - this just:
|
||||||
|
1. Adds the current timestamp (only thing calculated every 15min)
|
||||||
|
2. Uses the current/next period from summaries
|
||||||
|
3. Adds nested period summaries
|
||||||
|
|
||||||
|
Attributes are ordered following the documented priority:
|
||||||
|
1. Time information (timestamp, start, end, duration)
|
||||||
|
2. Core decision attributes (level, rating_level, rating_difference_%)
|
||||||
|
3. Price statistics (price_avg, price_min, price_max, price_spread, volatility)
|
||||||
|
4. Price differences (period_price_diff_from_daily_min, period_price_diff_from_daily_min_%)
|
||||||
|
5. Detail information (period_interval_count, period_position, periods_total, periods_remaining)
|
||||||
|
6. Relaxation information (relaxation_active, relaxation_level, relaxation_threshold_original_%,
|
||||||
|
relaxation_threshold_applied_%) - only if period was relaxed
|
||||||
|
7. Meta information (periods list)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
current_period: The current or next period (already complete from coordinator)
|
||||||
|
period_summaries: All period summaries from coordinator
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Complete attributes dict with all fields
|
||||||
|
|
||||||
|
"""
|
||||||
|
now = dt_util.now()
|
||||||
|
current_minute = (now.minute // 15) * 15
|
||||||
|
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
|
||||||
|
|
||||||
|
if current_period:
|
||||||
|
# Build attributes in priority order using helper methods
|
||||||
|
attributes = {}
|
||||||
|
|
||||||
|
# 1. Time information
|
||||||
|
add_time_attributes(attributes, current_period, timestamp)
|
||||||
|
|
||||||
|
# 2. Core decision attributes
|
||||||
|
add_decision_attributes(attributes, current_period)
|
||||||
|
|
||||||
|
# 3. Price statistics
|
||||||
|
add_price_attributes(attributes, current_period)
|
||||||
|
|
||||||
|
# 4. Price differences
|
||||||
|
add_comparison_attributes(attributes, current_period)
|
||||||
|
|
||||||
|
# 5. Detail information
|
||||||
|
add_detail_attributes(attributes, current_period)
|
||||||
|
|
||||||
|
# 6. Relaxation information (only if period was relaxed)
|
||||||
|
add_relaxation_attributes(attributes, current_period)
|
||||||
|
|
||||||
|
# 7. Meta information (periods array)
|
||||||
|
attributes["periods"] = period_summaries
|
||||||
|
|
||||||
|
return attributes
|
||||||
|
|
||||||
|
# No current/next period found - return all periods with timestamp
|
||||||
|
return {
|
||||||
|
"timestamp": timestamp,
|
||||||
|
"periods": period_summaries,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def build_async_extra_state_attributes( # noqa: PLR0913
|
||||||
|
entity_key: str,
|
||||||
|
translation_key: str | None,
|
||||||
|
hass: HomeAssistant,
|
||||||
|
*,
|
||||||
|
config_entry: TibberPricesConfigEntry,
|
||||||
|
dynamic_attrs: dict | None = None,
|
||||||
|
is_on: bool | None = None,
|
||||||
|
) -> dict | None:
|
||||||
|
"""
|
||||||
|
Build async extra state attributes for binary sensors.
|
||||||
|
|
||||||
|
Adds icon_color and translated descriptions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity_key: Entity key (e.g., "best_price_period")
|
||||||
|
translation_key: Translation key for entity
|
||||||
|
hass: Home Assistant instance
|
||||||
|
config_entry: Config entry with options (keyword-only)
|
||||||
|
dynamic_attrs: Dynamic attributes from attribute getter (keyword-only)
|
||||||
|
is_on: Binary sensor state (keyword-only)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Complete attributes dict with descriptions
|
||||||
|
|
||||||
|
"""
|
||||||
|
attributes = {}
|
||||||
|
|
||||||
|
# Add dynamic attributes if available
|
||||||
|
if dynamic_attrs:
|
||||||
|
# Copy and remove internal fields before exposing to user
|
||||||
|
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
|
||||||
|
attributes.update(clean_attrs)
|
||||||
|
|
||||||
|
# Add icon_color for best/peak price period sensors using shared utility
|
||||||
|
add_icon_color_attribute(attributes, entity_key, is_on=is_on)
|
||||||
|
|
||||||
|
# Add description from the custom translations file
|
||||||
|
if translation_key and hass is not None:
|
||||||
|
# Get user's language preference
|
||||||
|
language = hass.config.language if hass.config.language else "en"
|
||||||
|
|
||||||
|
# Add basic description
|
||||||
|
description = await async_get_entity_description(
|
||||||
|
hass,
|
||||||
|
"binary_sensor",
|
||||||
|
translation_key,
|
||||||
|
language,
|
||||||
|
"description",
|
||||||
|
)
|
||||||
|
if description:
|
||||||
|
attributes["description"] = description
|
||||||
|
|
||||||
|
# Check if extended descriptions are enabled in the config
|
||||||
|
extended_descriptions = config_entry.options.get(
|
||||||
|
CONF_EXTENDED_DESCRIPTIONS,
|
||||||
|
config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add extended descriptions if enabled
|
||||||
|
if extended_descriptions:
|
||||||
|
# Add long description if available
|
||||||
|
long_desc = await async_get_entity_description(
|
||||||
|
hass,
|
||||||
|
"binary_sensor",
|
||||||
|
translation_key,
|
||||||
|
language,
|
||||||
|
"long_description",
|
||||||
|
)
|
||||||
|
if long_desc:
|
||||||
|
attributes["long_description"] = long_desc
|
||||||
|
|
||||||
|
# Add usage tips if available
|
||||||
|
usage_tips = await async_get_entity_description(
|
||||||
|
hass,
|
||||||
|
"binary_sensor",
|
||||||
|
translation_key,
|
||||||
|
language,
|
||||||
|
"usage_tips",
|
||||||
|
)
|
||||||
|
if usage_tips:
|
||||||
|
attributes["usage_tips"] = usage_tips
|
||||||
|
|
||||||
|
return attributes if attributes else None
|
||||||
|
|
||||||
|
|
||||||
|
def build_sync_extra_state_attributes( # noqa: PLR0913
|
||||||
|
entity_key: str,
|
||||||
|
translation_key: str | None,
|
||||||
|
hass: HomeAssistant,
|
||||||
|
*,
|
||||||
|
config_entry: TibberPricesConfigEntry,
|
||||||
|
dynamic_attrs: dict | None = None,
|
||||||
|
is_on: bool | None = None,
|
||||||
|
) -> dict | None:
|
||||||
|
"""
|
||||||
|
Build synchronous extra state attributes for binary sensors.
|
||||||
|
|
||||||
|
Adds icon_color and cached translated descriptions.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
entity_key: Entity key (e.g., "best_price_period")
|
||||||
|
translation_key: Translation key for entity
|
||||||
|
hass: Home Assistant instance
|
||||||
|
config_entry: Config entry with options (keyword-only)
|
||||||
|
dynamic_attrs: Dynamic attributes from attribute getter (keyword-only)
|
||||||
|
is_on: Binary sensor state (keyword-only)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Complete attributes dict with cached descriptions
|
||||||
|
|
||||||
|
"""
|
||||||
|
attributes = {}
|
||||||
|
|
||||||
|
# Add dynamic attributes if available
|
||||||
|
if dynamic_attrs:
|
||||||
|
# Copy and remove internal fields before exposing to user
|
||||||
|
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
|
||||||
|
attributes.update(clean_attrs)
|
||||||
|
|
||||||
|
# Add icon_color for best/peak price period sensors using shared utility
|
||||||
|
add_icon_color_attribute(attributes, entity_key, is_on=is_on)
|
||||||
|
|
||||||
|
# Add descriptions from the cache (non-blocking)
|
||||||
|
if translation_key and hass is not None:
|
||||||
|
# Get user's language preference
|
||||||
|
language = hass.config.language if hass.config.language else "en"
|
||||||
|
|
||||||
|
# Add basic description from cache
|
||||||
|
description = get_entity_description(
|
||||||
|
"binary_sensor",
|
||||||
|
translation_key,
|
||||||
|
language,
|
||||||
|
"description",
|
||||||
|
)
|
||||||
|
if description:
|
||||||
|
attributes["description"] = description
|
||||||
|
|
||||||
|
# Check if extended descriptions are enabled in the config
|
||||||
|
extended_descriptions = config_entry.options.get(
|
||||||
|
CONF_EXTENDED_DESCRIPTIONS,
|
||||||
|
config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add extended descriptions if enabled
|
||||||
|
if extended_descriptions:
|
||||||
|
# Add long description from cache
|
||||||
|
long_desc = get_entity_description(
|
||||||
|
"binary_sensor",
|
||||||
|
translation_key,
|
||||||
|
language,
|
||||||
|
"long_description",
|
||||||
|
)
|
||||||
|
if long_desc:
|
||||||
|
attributes["long_description"] = long_desc
|
||||||
|
|
||||||
|
# Add usage tips from cache
|
||||||
|
usage_tips = get_entity_description(
|
||||||
|
"binary_sensor",
|
||||||
|
translation_key,
|
||||||
|
language,
|
||||||
|
"usage_tips",
|
||||||
|
)
|
||||||
|
if usage_tips:
|
||||||
|
attributes["usage_tips"] = usage_tips
|
||||||
|
|
||||||
|
return attributes if attributes else None
|
||||||
283
custom_components/tibber_prices/binary_sensor/core.py
Normal file
283
custom_components/tibber_prices/binary_sensor/core.py
Normal file
|
|
@ -0,0 +1,283 @@
|
||||||
|
"""Binary sensor core class for tibber_prices."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import timedelta
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from custom_components.tibber_prices.coordinator import TIME_SENSITIVE_ENTITY_KEYS
|
||||||
|
from custom_components.tibber_prices.entity import TibberPricesEntity
|
||||||
|
from custom_components.tibber_prices.entity_utils import get_binary_sensor_icon
|
||||||
|
from homeassistant.components.binary_sensor import (
|
||||||
|
BinarySensorEntity,
|
||||||
|
BinarySensorEntityDescription,
|
||||||
|
)
|
||||||
|
from homeassistant.core import callback
|
||||||
|
from homeassistant.util import dt as dt_util
|
||||||
|
|
||||||
|
from .attributes import (
|
||||||
|
build_async_extra_state_attributes,
|
||||||
|
build_sync_extra_state_attributes,
|
||||||
|
get_price_intervals_attributes,
|
||||||
|
get_tomorrow_data_available_attributes,
|
||||||
|
)
|
||||||
|
from .definitions import (
|
||||||
|
MIN_TOMORROW_INTERVALS_15MIN,
|
||||||
|
PERIOD_LOOKAHEAD_HOURS,
|
||||||
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
from custom_components.tibber_prices.coordinator import (
|
||||||
|
TibberPricesDataUpdateCoordinator,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
|
||||||
|
"""tibber_prices binary_sensor class."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
coordinator: TibberPricesDataUpdateCoordinator,
|
||||||
|
entity_description: BinarySensorEntityDescription,
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the binary_sensor class."""
|
||||||
|
super().__init__(coordinator)
|
||||||
|
self.entity_description = entity_description
|
||||||
|
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{entity_description.key}"
|
||||||
|
self._state_getter: Callable | None = self._get_state_getter()
|
||||||
|
self._attribute_getter: Callable | None = self._get_attribute_getter()
|
||||||
|
self._time_sensitive_remove_listener: Callable | None = None
|
||||||
|
|
||||||
|
async def async_added_to_hass(self) -> None:
|
||||||
|
"""When entity is added to hass."""
|
||||||
|
await super().async_added_to_hass()
|
||||||
|
|
||||||
|
# Register with coordinator for time-sensitive updates if applicable
|
||||||
|
if self.entity_description.key in TIME_SENSITIVE_ENTITY_KEYS:
|
||||||
|
self._time_sensitive_remove_listener = self.coordinator.async_add_time_sensitive_listener(
|
||||||
|
self._handle_time_sensitive_update
|
||||||
|
)
|
||||||
|
|
||||||
|
async def async_will_remove_from_hass(self) -> None:
|
||||||
|
"""When entity will be removed from hass."""
|
||||||
|
await super().async_will_remove_from_hass()
|
||||||
|
|
||||||
|
# Remove time-sensitive listener if registered
|
||||||
|
if self._time_sensitive_remove_listener:
|
||||||
|
self._time_sensitive_remove_listener()
|
||||||
|
self._time_sensitive_remove_listener = None
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _handle_time_sensitive_update(self) -> None:
|
||||||
|
"""Handle time-sensitive update from coordinator."""
|
||||||
|
self.async_write_ha_state()
|
||||||
|
|
||||||
|
def _get_state_getter(self) -> Callable | None:
|
||||||
|
"""Return the appropriate state getter method based on the sensor type."""
|
||||||
|
key = self.entity_description.key
|
||||||
|
|
||||||
|
if key == "peak_price_period":
|
||||||
|
return self._peak_price_state
|
||||||
|
if key == "best_price_period":
|
||||||
|
return self._best_price_state
|
||||||
|
if key == "connection":
|
||||||
|
return lambda: True if self.coordinator.data else None
|
||||||
|
if key == "tomorrow_data_available":
|
||||||
|
return self._tomorrow_data_available_state
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _best_price_state(self) -> bool | None:
|
||||||
|
"""Return True if the current time is within a best price period."""
|
||||||
|
if not self.coordinator.data:
|
||||||
|
return None
|
||||||
|
attrs = get_price_intervals_attributes(self.coordinator.data, reverse_sort=False)
|
||||||
|
if not attrs:
|
||||||
|
return False # Should not happen, but safety fallback
|
||||||
|
start = attrs.get("start")
|
||||||
|
end = attrs.get("end")
|
||||||
|
if not start or not end:
|
||||||
|
return False # No period found = sensor is off
|
||||||
|
now = dt_util.now()
|
||||||
|
return start <= now < end
|
||||||
|
|
||||||
|
def _peak_price_state(self) -> bool | None:
|
||||||
|
"""Return True if the current time is within a peak price period."""
|
||||||
|
if not self.coordinator.data:
|
||||||
|
return None
|
||||||
|
attrs = get_price_intervals_attributes(self.coordinator.data, reverse_sort=True)
|
||||||
|
if not attrs:
|
||||||
|
return False # Should not happen, but safety fallback
|
||||||
|
start = attrs.get("start")
|
||||||
|
end = attrs.get("end")
|
||||||
|
if not start or not end:
|
||||||
|
return False # No period found = sensor is off
|
||||||
|
now = dt_util.now()
|
||||||
|
return start <= now < end
|
||||||
|
|
||||||
|
def _tomorrow_data_available_state(self) -> bool | None:
|
||||||
|
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
|
||||||
|
if not self.coordinator.data:
|
||||||
|
return None
|
||||||
|
price_info = self.coordinator.data.get("priceInfo", {})
|
||||||
|
tomorrow_prices = price_info.get("tomorrow", [])
|
||||||
|
interval_count = len(tomorrow_prices)
|
||||||
|
if interval_count == MIN_TOMORROW_INTERVALS_15MIN:
|
||||||
|
return True
|
||||||
|
if interval_count == 0:
|
||||||
|
return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _get_tomorrow_data_available_attributes(self) -> dict | None:
|
||||||
|
"""Return attributes for tomorrow_data_available binary sensor."""
|
||||||
|
return get_tomorrow_data_available_attributes(self.coordinator.data)
|
||||||
|
|
||||||
|
def _get_attribute_getter(self) -> Callable | None:
|
||||||
|
"""Return the appropriate attribute getter method based on the sensor type."""
|
||||||
|
key = self.entity_description.key
|
||||||
|
|
||||||
|
if key == "peak_price_period":
|
||||||
|
return lambda: get_price_intervals_attributes(self.coordinator.data, reverse_sort=True)
|
||||||
|
if key == "best_price_period":
|
||||||
|
return lambda: get_price_intervals_attributes(self.coordinator.data, reverse_sort=False)
|
||||||
|
if key == "tomorrow_data_available":
|
||||||
|
return self._get_tomorrow_data_available_attributes
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_on(self) -> bool | None:
|
||||||
|
"""Return true if the binary_sensor is on."""
|
||||||
|
try:
|
||||||
|
if not self.coordinator.data or not self._state_getter:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return self._state_getter()
|
||||||
|
|
||||||
|
except (KeyError, ValueError, TypeError) as ex:
|
||||||
|
self.coordinator.logger.exception(
|
||||||
|
"Error getting binary sensor state",
|
||||||
|
extra={
|
||||||
|
"error": str(ex),
|
||||||
|
"entity": self.entity_description.key,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def icon(self) -> str | None:
|
||||||
|
"""Return the icon based on binary sensor state."""
|
||||||
|
key = self.entity_description.key
|
||||||
|
|
||||||
|
# Use shared icon utility
|
||||||
|
icon = get_binary_sensor_icon(
|
||||||
|
key,
|
||||||
|
is_on=self.is_on,
|
||||||
|
has_future_periods_callback=self._has_future_periods,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fall back to static icon from entity description
|
||||||
|
return icon or self.entity_description.icon
|
||||||
|
|
||||||
|
def _has_future_periods(self) -> bool:
|
||||||
|
"""
|
||||||
|
Check if there are periods starting within the next 6 hours.
|
||||||
|
|
||||||
|
Returns True if any period starts between now and PERIOD_LOOKAHEAD_HOURS from now.
|
||||||
|
This provides a practical planning horizon instead of hard midnight cutoff.
|
||||||
|
"""
|
||||||
|
if not self._attribute_getter:
|
||||||
|
return False
|
||||||
|
|
||||||
|
attrs = self._attribute_getter()
|
||||||
|
if not attrs or "periods" not in attrs:
|
||||||
|
return False
|
||||||
|
|
||||||
|
now = dt_util.now()
|
||||||
|
horizon = now + timedelta(hours=PERIOD_LOOKAHEAD_HOURS)
|
||||||
|
periods = attrs.get("periods", [])
|
||||||
|
|
||||||
|
# Check if any period starts within the look-ahead window
|
||||||
|
for period in periods:
|
||||||
|
start_str = period.get("start")
|
||||||
|
if start_str:
|
||||||
|
# Parse datetime if it's a string, otherwise use as-is
|
||||||
|
start_time = dt_util.parse_datetime(start_str) if isinstance(start_str, str) else start_str
|
||||||
|
|
||||||
|
if start_time:
|
||||||
|
start_time_local = dt_util.as_local(start_time)
|
||||||
|
# Period starts in the future but within our horizon
|
||||||
|
if now < start_time_local <= horizon:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
@property
|
||||||
|
async def async_extra_state_attributes(self) -> dict | None:
|
||||||
|
"""Return additional state attributes asynchronously."""
|
||||||
|
try:
|
||||||
|
# Get the dynamic attributes if the getter is available
|
||||||
|
if not self.coordinator.data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
dynamic_attrs = None
|
||||||
|
if self._attribute_getter:
|
||||||
|
dynamic_attrs = self._attribute_getter()
|
||||||
|
|
||||||
|
# Use extracted function to build all attributes
|
||||||
|
return await build_async_extra_state_attributes(
|
||||||
|
self.entity_description.key,
|
||||||
|
self.entity_description.translation_key,
|
||||||
|
self.hass,
|
||||||
|
config_entry=self.coordinator.config_entry,
|
||||||
|
dynamic_attrs=dynamic_attrs,
|
||||||
|
is_on=self.is_on,
|
||||||
|
)
|
||||||
|
|
||||||
|
except (KeyError, ValueError, TypeError) as ex:
|
||||||
|
self.coordinator.logger.exception(
|
||||||
|
"Error getting binary sensor attributes",
|
||||||
|
extra={
|
||||||
|
"error": str(ex),
|
||||||
|
"entity": self.entity_description.key,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def extra_state_attributes(self) -> dict | None:
|
||||||
|
"""Return additional state attributes synchronously."""
|
||||||
|
try:
|
||||||
|
# Start with dynamic attributes if available
|
||||||
|
if not self.coordinator.data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
dynamic_attrs = None
|
||||||
|
if self._attribute_getter:
|
||||||
|
dynamic_attrs = self._attribute_getter()
|
||||||
|
|
||||||
|
# Use extracted function to build all attributes
|
||||||
|
return build_sync_extra_state_attributes(
|
||||||
|
self.entity_description.key,
|
||||||
|
self.entity_description.translation_key,
|
||||||
|
self.hass,
|
||||||
|
config_entry=self.coordinator.config_entry,
|
||||||
|
dynamic_attrs=dynamic_attrs,
|
||||||
|
is_on=self.is_on,
|
||||||
|
)
|
||||||
|
|
||||||
|
except (KeyError, ValueError, TypeError) as ex:
|
||||||
|
self.coordinator.logger.exception(
|
||||||
|
"Error getting binary sensor attributes",
|
||||||
|
extra={
|
||||||
|
"error": str(ex),
|
||||||
|
"entity": self.entity_description.key,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def async_update(self) -> None:
|
||||||
|
"""Force a refresh when homeassistant.update_entity is called."""
|
||||||
|
await self.coordinator.async_request_refresh()
|
||||||
46
custom_components/tibber_prices/binary_sensor/definitions.py
Normal file
46
custom_components/tibber_prices/binary_sensor/definitions.py
Normal file
|
|
@ -0,0 +1,46 @@
|
||||||
|
"""Binary sensor entity descriptions for tibber_prices."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from homeassistant.components.binary_sensor import (
|
||||||
|
BinarySensorDeviceClass,
|
||||||
|
BinarySensorEntityDescription,
|
||||||
|
)
|
||||||
|
from homeassistant.const import EntityCategory
|
||||||
|
|
||||||
|
# Constants
|
||||||
|
MINUTES_PER_INTERVAL = 15
|
||||||
|
MIN_TOMORROW_INTERVALS_15MIN = 96
|
||||||
|
|
||||||
|
# Look-ahead window for future period detection (hours)
|
||||||
|
# Icons will show "waiting" state if a period starts within this window
|
||||||
|
PERIOD_LOOKAHEAD_HOURS = 6
|
||||||
|
|
||||||
|
ENTITY_DESCRIPTIONS = (
|
||||||
|
BinarySensorEntityDescription(
|
||||||
|
key="peak_price_period",
|
||||||
|
translation_key="peak_price_period",
|
||||||
|
name="Peak Price Interval",
|
||||||
|
icon="mdi:clock-alert",
|
||||||
|
),
|
||||||
|
BinarySensorEntityDescription(
|
||||||
|
key="best_price_period",
|
||||||
|
translation_key="best_price_period",
|
||||||
|
name="Best Price Interval",
|
||||||
|
icon="mdi:clock-check",
|
||||||
|
),
|
||||||
|
BinarySensorEntityDescription(
|
||||||
|
key="connection",
|
||||||
|
translation_key="connection",
|
||||||
|
name="Tibber API Connection",
|
||||||
|
device_class=BinarySensorDeviceClass.CONNECTIVITY,
|
||||||
|
entity_category=EntityCategory.DIAGNOSTIC,
|
||||||
|
),
|
||||||
|
BinarySensorEntityDescription(
|
||||||
|
key="tomorrow_data_available",
|
||||||
|
translation_key="tomorrow_data_available",
|
||||||
|
name="Tomorrow's Data Available",
|
||||||
|
icon="mdi:calendar-check",
|
||||||
|
entity_category=EntityCategory.DIAGNOSTIC,
|
||||||
|
),
|
||||||
|
)
|
||||||
Loading…
Reference in a new issue