feat: Enhance Tibber Prices integration with new configuration options and improved data handling

- Added new configuration options for minimum distance from average price for best and peak prices.
- Updated default values for best and peak price flexibility.
- Improved coordinator to handle midnight turnover and data rotation more effectively.
- Refactored entity initialization to streamline device information retrieval.
- Updated sensor attributes to use more descriptive names for price values.
- Enhanced translations for new configuration options in English and German.
- Improved unit tests for coordinator functionality, ensuring proper cleanup and async handling.
This commit is contained in:
Julian Pawlowski 2025-11-06 11:43:22 +00:00
parent 03f09818d1
commit 63904fff39
11 changed files with 620 additions and 232 deletions

View file

@ -120,6 +120,4 @@ async def async_reload_entry(
entry: TibberPricesConfigEntry,
) -> None:
"""Reload config entry."""
LOGGER.debug(f"[tibber_prices] async_reload_entry called for entry_id={entry.entry_id}")
await async_unload_entry(hass, entry)
await async_setup_entry(hass, entry)
await hass.config_entries.async_reload(entry.entry_id)

View file

@ -28,11 +28,15 @@ if TYPE_CHECKING:
from .const import (
CONF_BEST_PRICE_FLEX,
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
CONF_EXTENDED_DESCRIPTIONS,
CONF_PEAK_PRICE_FLEX,
CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_BEST_PRICE_FLEX,
DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_EXTENDED_DESCRIPTIONS,
DEFAULT_PEAK_PRICE_FLEX,
DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
async_get_entity_description,
get_entity_description,
)
@ -280,7 +284,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
# Calculate difference from average price for the day
avg_diff = new_interval["price"] - annotation_ctx["avg_price"]
new_interval["price_diff_from_avg"] = round(avg_diff, 4)
new_interval["price_diff_from_avg_ct"] = round(avg_diff * 100, 2)
new_interval["price_diff_from_avg_minor"] = round(avg_diff * 100, 2)
avg_diff_percent = (
((new_interval["price"] - annotation_ctx["avg_price"]) / annotation_ctx["avg_price"]) * 100
if annotation_ctx["avg_price"] != 0
@ -291,22 +295,24 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
trailing_avg = annotation_ctx.get("trailing_24h_avg", 0.0)
trailing_avg_diff = new_interval["price"] - trailing_avg
new_interval["price_diff_from_trailing_24h_avg"] = round(trailing_avg_diff, 4)
new_interval["price_diff_from_trailing_24h_avg_ct"] = round(trailing_avg_diff * 100, 2)
new_interval["price_diff_from_trailing_24h_avg_minor"] = round(trailing_avg_diff * 100, 2)
trailing_avg_diff_percent = (
((new_interval["price"] - trailing_avg) / trailing_avg) * 100 if trailing_avg != 0 else 0.0
)
new_interval["price_diff_from_trailing_24h_avg_" + PERCENTAGE] = round(trailing_avg_diff_percent, 2)
new_interval["trailing_24h_avg_price"] = round(trailing_avg, 4)
new_interval["trailing_24h_avg_price_minor"] = round(trailing_avg * 100, 2)
# Calculate difference from leading 24-hour average
leading_avg = annotation_ctx.get("leading_24h_avg", 0.0)
leading_avg_diff = new_interval["price"] - leading_avg
new_interval["price_diff_from_leading_24h_avg"] = round(leading_avg_diff, 4)
new_interval["price_diff_from_leading_24h_avg_ct"] = round(leading_avg_diff * 100, 2)
new_interval["price_diff_from_leading_24h_avg_minor"] = round(leading_avg_diff * 100, 2)
leading_avg_diff_percent = (
((new_interval["price"] - leading_avg) / leading_avg) * 100 if leading_avg != 0 else 0.0
)
new_interval["price_diff_from_leading_24h_avg_" + PERCENTAGE] = round(leading_avg_diff_percent, 2)
new_interval["leading_24h_avg_price"] = round(leading_avg, 4)
new_interval["leading_24h_avg_price_minor"] = round(leading_avg * 100, 2)
return new_interval
def _annotate_period_intervals(
@ -330,15 +336,15 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
reference_type = "ref"
if reference_type == "min":
diff_key = "price_diff_from_min"
diff_ct_key = "price_diff_from_min_ct"
diff_ct_key = "price_diff_from_min_minor"
diff_pct_key = "price_diff_from_min_" + PERCENTAGE
elif reference_type == "max":
diff_key = "price_diff_from_max"
diff_ct_key = "price_diff_from_max_ct"
diff_ct_key = "price_diff_from_max_minor"
diff_pct_key = "price_diff_from_max_" + PERCENTAGE
else:
diff_key = "price_diff"
diff_ct_key = "price_diff_ct"
diff_ct_key = "price_diff_minor"
diff_pct_key = "price_diff_" + PERCENTAGE
result = []
period_count = len(periods)
@ -415,8 +421,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
def _build_periods(
self,
all_prices: list[dict],
ref_prices: dict,
flex: float,
price_context: dict,
*,
reverse_sort: bool,
) -> list[list[dict]]:
@ -424,7 +429,24 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
Build periods, allowing periods to cross midnight (day boundary).
Strictly enforce flex threshold by percent diff, matching attribute calculation.
Additionally enforces:
1. Cap at daily average to prevent overlap between best and peak periods
2. Minimum distance from average to ensure meaningful price difference
Args:
all_prices: All price data points
price_context: Dict with ref_prices, avg_prices, flex, and min_distance_from_avg
reverse_sort: True for peak price (descending), False for best price (ascending)
Returns:
List of periods, each period is a list of interval dicts
"""
ref_prices = price_context["ref_prices"]
avg_prices = price_context["avg_prices"]
flex = price_context["flex"]
min_distance_from_avg = price_context["min_distance_from_avg"]
periods: list[list[dict]] = []
current_period: list[dict] = []
last_ref_date = None
@ -435,18 +457,34 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
starts_at = dt_util.as_local(starts_at)
date = starts_at.date()
ref_price = ref_prices[date]
avg_price = avg_prices[date]
price = float(price_data["total"])
percent_diff = ((price - ref_price) / ref_price) * 100 if ref_price != 0 else 0.0
percent_diff = round(percent_diff, 2)
# For best price (flex >= 0): percent_diff <= flex*100 (prices up to flex% above reference)
# For peak price (flex <= 0): percent_diff >= flex*100 (prices down to |flex|% below reference)
in_flex = percent_diff <= flex * 100 if not reverse_sort else percent_diff >= flex * 100
# Cap at daily average to prevent overlap between best and peak periods
# Best price: only prices below average
# Peak price: only prices above average
within_avg_boundary = price <= avg_price if not reverse_sort else price >= avg_price
# Enforce minimum distance from average (in percentage terms)
# Best price: price must be at least min_distance_from_avg% below average
# Peak price: price must be at least min_distance_from_avg% above average
if not reverse_sort:
# Best price: price <= avg * (1 - min_distance_from_avg/100)
min_distance_threshold = avg_price * (1 - min_distance_from_avg / 100)
meets_min_distance = price <= min_distance_threshold
else:
# Peak price: price >= avg * (1 + min_distance_from_avg/100)
min_distance_threshold = avg_price * (1 + min_distance_from_avg / 100)
meets_min_distance = price >= min_distance_threshold
# Split period if day changes
if last_ref_date is not None and date != last_ref_date and current_period:
periods.append(current_period)
current_period = []
last_ref_date = date
if in_flex:
if in_flex and within_avg_boundary and meets_min_distance:
current_period.append(
{
"interval_hour": starts_at.hour,
@ -527,10 +565,19 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
CONF_BEST_PRICE_FLEX if not reverse_sort else CONF_PEAK_PRICE_FLEX,
DEFAULT_BEST_PRICE_FLEX if not reverse_sort else DEFAULT_PEAK_PRICE_FLEX,
)
min_distance_from_avg = self._get_flex_option(
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG if not reverse_sort else CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG if not reverse_sort else DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
)
price_context = {
"ref_prices": ref_prices,
"avg_prices": avg_price_by_day,
"flex": flex,
"min_distance_from_avg": min_distance_from_avg,
}
periods = self._build_periods(
all_prices,
ref_prices,
flex,
price_context,
reverse_sort=reverse_sort,
)
self._add_interval_ends(periods)

View file

@ -39,13 +39,17 @@ from .api import (
)
from .const import (
CONF_BEST_PRICE_FLEX,
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
CONF_EXTENDED_DESCRIPTIONS,
CONF_PEAK_PRICE_FLEX,
CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
CONF_PRICE_RATING_THRESHOLD_HIGH,
CONF_PRICE_RATING_THRESHOLD_LOW,
DEFAULT_BEST_PRICE_FLEX,
DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_EXTENDED_DESCRIPTIONS,
DEFAULT_PEAK_PRICE_FLEX,
DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
DEFAULT_PRICE_RATING_THRESHOLD_LOW,
DOMAIN,
@ -80,7 +84,7 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow: # noqa: ARG004
def async_get_options_flow(_config_entry: ConfigEntry) -> OptionsFlow:
"""Create an options flow for this configentry."""
return TibberPricesOptionsFlowHandler()
@ -211,8 +215,8 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
def _get_home_title(home: dict) -> str:
"""Generate a user-friendly title for a home."""
title = home.get("appNickname")
if title:
return title
if title and title.strip():
return title.strip()
address = home.get("address", {})
if address:
@ -362,8 +366,8 @@ class TibberPricesSubentryFlowHandler(ConfigSubentryFlow):
def _get_home_title(self, home: dict) -> str:
"""Generate a user-friendly title for a home."""
title = home.get("appNickname")
if title:
return title
if title and title.strip():
return title.strip()
address = home.get("address", {})
if address:
@ -406,13 +410,19 @@ class TibberPricesSubentryFlowHandler(ConfigSubentryFlow):
class TibberPricesOptionsFlowHandler(OptionsFlow):
"""Handle options for tibber_prices entries."""
def __init__(self) -> None:
"""Initialize options flow."""
self._options: dict[str, Any] = {}
async def async_step_init(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
"""Manage the options."""
"""Manage the options - General Settings."""
# Initialize options from config_entry on first call
if not self._options:
self._options = dict(self.config_entry.options)
if user_input is not None:
return self.async_create_entry(
title="",
data=user_input,
)
self._options.update(user_input)
return await self.async_step_price_rating()
return self.async_show_form(
step_id="init",
@ -424,38 +434,23 @@ class TibberPricesOptionsFlowHandler(OptionsFlow):
CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS
),
): BooleanSelector(),
vol.Optional(
CONF_BEST_PRICE_FLEX,
default=int(
self.config_entry.options.get(
CONF_BEST_PRICE_FLEX,
DEFAULT_BEST_PRICE_FLEX,
}
),
description_placeholders={
"user_login": self.config_entry.data.get("user_login", "N/A"),
},
)
),
): NumberSelector(
NumberSelectorConfig(
min=0,
max=100,
step=1,
mode=NumberSelectorMode.SLIDER,
),
),
vol.Optional(
CONF_PEAK_PRICE_FLEX,
default=int(
self.config_entry.options.get(
CONF_PEAK_PRICE_FLEX,
DEFAULT_PEAK_PRICE_FLEX,
)
),
): NumberSelector(
NumberSelectorConfig(
min=-100,
max=0,
step=1,
mode=NumberSelectorMode.SLIDER,
),
),
async def async_step_price_rating(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
"""Configure price rating thresholds."""
if user_input is not None:
self._options.update(user_input)
return await self.async_step_best_price()
return self.async_show_form(
step_id="price_rating",
data_schema=vol.Schema(
{
vol.Optional(
CONF_PRICE_RATING_THRESHOLD_LOW,
default=int(
@ -490,8 +485,96 @@ class TibberPricesOptionsFlowHandler(OptionsFlow):
),
}
),
description_placeholders={
"user_login": self.config_entry.data.get("user_login", "N/A"),
"unique_id": self.config_entry.unique_id or "unknown",
},
)
async def async_step_best_price(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
"""Configure best price period settings."""
if user_input is not None:
self._options.update(user_input)
return await self.async_step_peak_price()
return self.async_show_form(
step_id="best_price",
data_schema=vol.Schema(
{
vol.Optional(
CONF_BEST_PRICE_FLEX,
default=int(
self.config_entry.options.get(
CONF_BEST_PRICE_FLEX,
DEFAULT_BEST_PRICE_FLEX,
)
),
): NumberSelector(
NumberSelectorConfig(
min=0,
max=100,
step=1,
mode=NumberSelectorMode.SLIDER,
),
),
vol.Optional(
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
default=int(
self.config_entry.options.get(
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG,
)
),
): NumberSelector(
NumberSelectorConfig(
min=0,
max=50,
step=1,
mode=NumberSelectorMode.SLIDER,
),
),
}
),
)
async def async_step_peak_price(self, user_input: dict[str, Any] | None = None) -> ConfigFlowResult:
"""Configure peak price period settings."""
if user_input is not None:
self._options.update(user_input)
return self.async_create_entry(title="", data=self._options)
return self.async_show_form(
step_id="peak_price",
data_schema=vol.Schema(
{
vol.Optional(
CONF_PEAK_PRICE_FLEX,
default=int(
self.config_entry.options.get(
CONF_PEAK_PRICE_FLEX,
DEFAULT_PEAK_PRICE_FLEX,
)
),
): NumberSelector(
NumberSelectorConfig(
min=-100,
max=0,
step=1,
mode=NumberSelectorMode.SLIDER,
),
),
vol.Optional(
CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
default=int(
self.config_entry.options.get(
CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG,
)
),
): NumberSelector(
NumberSelectorConfig(
min=0,
max=50,
step=1,
mode=NumberSelectorMode.SLIDER,
),
),
}
),
)

View file

@ -23,6 +23,8 @@ DOMAIN = "tibber_prices"
CONF_EXTENDED_DESCRIPTIONS = "extended_descriptions"
CONF_BEST_PRICE_FLEX = "best_price_flex"
CONF_PEAK_PRICE_FLEX = "peak_price_flex"
CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG = "best_price_min_distance_from_avg"
CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG = "peak_price_min_distance_from_avg"
CONF_PRICE_RATING_THRESHOLD_LOW = "price_rating_threshold_low"
CONF_PRICE_RATING_THRESHOLD_HIGH = "price_rating_threshold_high"
@ -31,8 +33,10 @@ ATTRIBUTION = "Data provided by Tibber"
# Integration name should match manifest.json
DEFAULT_NAME = "Tibber Price Information & Ratings"
DEFAULT_EXTENDED_DESCRIPTIONS = False
DEFAULT_BEST_PRICE_FLEX = 20 # 20% flexibility for best price (user-facing, percent)
DEFAULT_PEAK_PRICE_FLEX = -20 # 20% flexibility for peak price (user-facing, percent)
DEFAULT_BEST_PRICE_FLEX = 15 # 15% flexibility for best price (user-facing, percent)
DEFAULT_PEAK_PRICE_FLEX = -15 # 15% flexibility for peak price (user-facing, percent)
DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG = 2 # 2% minimum distance from daily average for best price
DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG = 2 # 2% minimum distance from daily average for peak price
DEFAULT_PRICE_RATING_THRESHOLD_LOW = -10 # Default rating threshold low percentage
DEFAULT_PRICE_RATING_THRESHOLD_HIGH = 10 # Default rating threshold high percentage

View file

@ -3,13 +3,14 @@
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from typing import TYPE_CHECKING, Any
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers import aiohttp_client
from homeassistant.helpers.event import async_track_utc_time_change
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
@ -40,12 +41,18 @@ _LOGGER = logging.getLogger(__name__)
# Storage version for storing data
STORAGE_VERSION = 1
# Update interval - fetch data every 15 minutes
# Update interval - fetch data every 15 minutes (when data is incomplete)
UPDATE_INTERVAL = timedelta(minutes=15)
# Update interval when all data is available - every 4 hours (reduce API calls)
UPDATE_INTERVAL_COMPLETE = timedelta(hours=4)
# Quarter-hour boundaries for entity state updates (minutes: 00, 15, 30, 45)
QUARTER_HOUR_BOUNDARIES = (0, 15, 30, 45)
# Hour after which tomorrow's price data is expected (13:00 local time)
TOMORROW_DATA_CHECK_HOUR = 13
class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Enhanced coordinator with main/subentry pattern and comprehensive caching."""
@ -82,66 +89,131 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self._cached_price_data: dict[str, Any] | None = None
self._last_price_update: datetime | None = None
# Track the last date we checked for midnight turnover
self._last_midnight_check: datetime | None = None
# Track if this is the main entry (first one created)
self._is_main_entry = not self._has_existing_main_coordinator()
# Quarter-hour entity refresh timer
self._quarter_hour_timer_handle: Any = None
# Log prefix for identifying this coordinator instance
self._log_prefix = f"[{config_entry.title}]"
# Quarter-hour entity refresh timer (runs at :00, :15, :30, :45)
self._quarter_hour_timer_cancel: CALLBACK_TYPE | None = None
self._schedule_quarter_hour_refresh()
def _log(self, level: str, message: str, *args: Any, **kwargs: Any) -> None:
"""Log with coordinator-specific prefix."""
prefixed_message = f"{self._log_prefix} {message}"
getattr(_LOGGER, level)(prefixed_message, *args, **kwargs)
def _schedule_quarter_hour_refresh(self) -> None:
"""Schedule the next quarter-hour entity refresh."""
now = dt_util.utcnow()
current_minute = now.minute
# Find the next quarter-hour boundary
for boundary in QUARTER_HOUR_BOUNDARIES:
if boundary > current_minute:
minutes_to_wait = boundary - current_minute
break
else:
# All boundaries passed, go to first boundary of next hour
minutes_to_wait = (60 - current_minute) + QUARTER_HOUR_BOUNDARIES[0]
# Calculate the exact time of the next boundary
next_refresh = now + timedelta(minutes=minutes_to_wait)
next_refresh = next_refresh.replace(second=0, microsecond=0)
"""Schedule the next quarter-hour entity refresh using Home Assistant's time tracking."""
# Cancel any existing timer
if self._quarter_hour_timer_handle:
self._quarter_hour_timer_handle.cancel()
if self._quarter_hour_timer_cancel:
self._quarter_hour_timer_cancel()
self._quarter_hour_timer_cancel = None
# Schedule the refresh
self._quarter_hour_timer_handle = self.hass.loop.call_at(
self.hass.loop.time() + (next_refresh - now).total_seconds(),
# Use Home Assistant's async_track_utc_time_change to trigger exactly at quarter-hour boundaries
# This ensures we trigger at :00, :15, :30, :45 seconds=1 to avoid triggering too early
self._quarter_hour_timer_cancel = async_track_utc_time_change(
self.hass,
self._handle_quarter_hour_refresh,
minute=QUARTER_HOUR_BOUNDARIES,
second=1,
)
_LOGGER.debug(
"Scheduled entity refresh at %s (in %d minutes)",
next_refresh.isoformat(),
minutes_to_wait,
self._log(
"debug",
"Scheduled quarter-hour refresh for boundaries: %s (at second=1)",
QUARTER_HOUR_BOUNDARIES,
)
@callback
def _handle_quarter_hour_refresh(self) -> None:
"""Handle quarter-hour entity refresh by triggering async state updates."""
_LOGGER.debug("Quarter-hour refresh triggered at %s", dt_util.utcnow().isoformat())
def _handle_quarter_hour_refresh(self, _now: datetime | None = None) -> None:
"""Handle quarter-hour entity refresh - check for midnight turnover and update entities."""
now = dt_util.now()
self._log("debug", "Quarter-hour refresh triggered at %s", now.isoformat())
# Notify all listeners to update their state without fetching fresh data
# Check if midnight has passed since last check
midnight_turnover_performed = self._check_and_handle_midnight_turnover(now)
if midnight_turnover_performed:
self._log("info", "Midnight turnover detected and performed during quarter-hour refresh")
# Schedule cache save asynchronously (we're in a callback)
self.hass.async_create_task(self._store_cache())
# Entity update already done in _check_and_handle_midnight_turnover
# Skip the regular update to avoid double-update
else:
# Regular quarter-hour refresh - notify listeners to update their state
# This causes entity state properties to be re-evaluated with the current time
# Using async_update_listeners() instead of async_set_updated_data() to avoid
# interfering with the coordinator's update timing
self.async_update_listeners()
# Schedule the next quarter-hour refresh
self._schedule_quarter_hour_refresh()
@callback
def _check_and_handle_midnight_turnover(self, now: datetime) -> bool:
"""
Check if midnight has passed and perform data rotation if needed.
This is called by the quarter-hour timer to ensure timely rotation
without waiting for the next API update cycle.
Returns:
True if midnight turnover was performed, False otherwise
"""
current_date = now.date()
# First time check - initialize
if self._last_midnight_check is None:
self._last_midnight_check = now
return False
last_check_date = self._last_midnight_check.date()
# Check if we've crossed into a new day
if current_date > last_check_date:
self._log(
"debug",
"Midnight crossed: last_check=%s, current=%s",
last_check_date,
current_date,
)
# Perform rotation on cached data if available
if self._cached_price_data and "homes" in self._cached_price_data:
for home_id, home_data in self._cached_price_data["homes"].items():
if "price_info" in home_data:
price_info = home_data["price_info"]
rotated = self._perform_midnight_turnover(price_info)
home_data["price_info"] = rotated
self._log("debug", "Rotated price data for home %s", home_id)
# Update coordinator's data with enriched rotated data
if self.data:
# Re-transform data to ensure enrichment is applied to rotated data
if self.is_main_entry():
self.data = self._transform_data_for_main_entry(self._cached_price_data)
else:
# For subentry, we need to get data from main coordinator
# but we can update the timestamp to trigger entity refresh
self.data["timestamp"] = now
# Notify listeners about the updated data after rotation
self.async_update_listeners()
self._last_midnight_check = now
return True
self._last_midnight_check = now
return False
async def async_shutdown(self) -> None:
"""Shut down the coordinator and clean up timers."""
if self._quarter_hour_timer_handle:
self._quarter_hour_timer_handle.cancel()
self._quarter_hour_timer_handle = None
if self._quarter_hour_timer_cancel:
self._quarter_hour_timer_cancel()
self._quarter_hour_timer_cancel = None
def _has_existing_main_coordinator(self) -> bool:
"""Check if there's already a main coordinator in hass.data."""
@ -179,7 +251,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
) as err:
# Use cached data as fallback if available
if self._cached_price_data is not None:
_LOGGER.warning("API error, using cached data: %s", err)
self._log("warning", "API error, using cached data: %s", err)
return self._merge_cached_data()
msg = f"Error communicating with API: {err}"
raise UpdateFailed(msg) from err
@ -199,16 +271,17 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Transform for main entry: provide aggregated view
return self._transform_data_for_main_entry(raw_data)
# Use cached data
# Use cached data if available
if self._cached_price_data is not None:
return self._transform_data_for_main_entry(self._cached_price_data)
# No cached data, fetch new
raw_data = await self._fetch_all_homes_data()
self._cached_price_data = raw_data
self._last_price_update = current_time
await self._store_cache()
return self._transform_data_for_main_entry(raw_data)
# Fallback: no cache and no update needed (shouldn't happen)
self._log("warning", "No cached data available and update not triggered - returning empty data")
return {
"timestamp": current_time,
"homes": {},
"priceInfo": {},
}
async def _handle_subentry_update(self) -> dict[str, Any]:
"""Handle update for subentry - get data from main coordinator."""
@ -217,7 +290,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
async def _fetch_all_homes_data(self) -> dict[str, Any]:
"""Fetch data for all homes (main coordinator only)."""
_LOGGER.debug("Fetching data for all homes")
self._log("debug", "Fetching data for all homes")
# Get price data for all homes
price_data = await self.api.async_get_price_info()
@ -226,6 +299,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
homes_list = price_data.get("homes", {})
for home_id, home_price_data in homes_list.items():
# Store raw price data without enrichment
# Enrichment will be done dynamically when data is transformed
home_data = {
"price_info": home_price_data,
}
@ -276,19 +351,21 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self._last_price_update = dt_util.parse_datetime(last_price_update)
if last_user_update := stored.get("last_user_update"):
self._last_user_update = dt_util.parse_datetime(last_user_update)
if last_midnight_check := stored.get("last_midnight_check"):
self._last_midnight_check = dt_util.parse_datetime(last_midnight_check)
# Validate cache: check if price data is from a previous day
if not self._is_cache_valid():
_LOGGER.info("Cached price data is from a previous day, clearing cache to fetch fresh data")
self._log("info", "Cached price data is from a previous day, clearing cache to fetch fresh data")
self._cached_price_data = None
self._last_price_update = None
await self._store_cache()
else:
_LOGGER.debug("Cache loaded successfully")
self._log("debug", "Cache loaded successfully")
else:
_LOGGER.debug("No cache found, will fetch fresh data")
self._log("debug", "No cache found, will fetch fresh data")
except OSError as ex:
_LOGGER.warning("Failed to load cache: %s", ex)
self._log("warning", "Failed to load cache: %s", ex)
def _is_cache_valid(self) -> bool:
"""
@ -307,7 +384,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
last_update_local_date = dt_util.as_local(self._last_price_update).date()
if current_local_date != last_update_local_date:
_LOGGER.debug(
self._log(
"debug",
"Cache date mismatch: cached=%s, current=%s",
last_update_local_date,
current_local_date,
@ -350,11 +428,12 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
prices_need_rotation = first_today_price_date < current_local_date
if prices_need_rotation:
_LOGGER.info("Performing midnight turnover: today→yesterday, tomorrow→today")
self._log("info", "Performing midnight turnover: today→yesterday, tomorrow→today")
return {
"yesterday": today_prices,
"today": tomorrow_prices,
"tomorrow": [],
"currency": price_info.get("currency", "EUR"),
}
return price_info
@ -366,11 +445,12 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"user_data": self._cached_user_data,
"last_price_update": (self._last_price_update.isoformat() if self._last_price_update else None),
"last_user_update": (self._last_user_update.isoformat() if self._last_user_update else None),
"last_midnight_check": (self._last_midnight_check.isoformat() if self._last_midnight_check else None),
}
try:
await self._store.async_save(data)
_LOGGER.debug("Cache stored successfully")
self._log("debug", "Cache stored successfully")
except OSError:
_LOGGER.exception("Failed to store cache")
@ -378,39 +458,103 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""Update user data if needed (daily check)."""
if self._last_user_update is None or current_time - self._last_user_update >= self._user_update_interval:
try:
_LOGGER.debug("Updating user data")
self._log("debug", "Updating user data")
user_data = await self.api.async_get_viewer_details()
self._cached_user_data = user_data
self._last_user_update = current_time
_LOGGER.debug("User data updated successfully")
self._log("debug", "User data updated successfully")
except (
TibberPricesApiClientError,
TibberPricesApiClientCommunicationError,
) as ex:
_LOGGER.warning("Failed to update user data: %s", ex)
self._log("warning", "Failed to update user data: %s", ex)
@callback
def _should_update_price_data(self, current_time: datetime) -> bool:
"""Check if price data should be updated."""
"""
Check if price data should be updated from the API.
Updates occur when:
1. No cached data exists
2. Cache is invalid (from previous day)
3. It's after 13:00 local time and tomorrow's data is missing or invalid
4. Regular update interval has passed
"""
if self._cached_price_data is None:
_LOGGER.debug("Should update: No cached price data")
self._log("debug", "Should update: No cached price data")
return True
if self._last_price_update is None:
_LOGGER.debug("Should update: No last price update timestamp")
self._log("debug", "Should update: No last price update timestamp")
return True
time_since_update = current_time - self._last_price_update
should_update = time_since_update >= UPDATE_INTERVAL
now_local = dt_util.as_local(current_time)
tomorrow_date = (now_local + timedelta(days=1)).date()
_LOGGER.debug(
"Should update price data: %s (time since last update: %s, interval: %s)",
# Check if after 13:00 and tomorrow data is missing or invalid
if (
now_local.hour >= TOMORROW_DATA_CHECK_HOUR
and self._cached_price_data
and "homes" in self._cached_price_data
and self._needs_tomorrow_data(tomorrow_date)
):
self._log("debug", "Should update: After %s:00 and valid tomorrow data missing", TOMORROW_DATA_CHECK_HOUR)
return True
# Check regular update interval
time_since_update = current_time - self._last_price_update
# Determine appropriate interval based on data completeness
has_tomorrow_data = self._has_valid_tomorrow_data(tomorrow_date)
interval = UPDATE_INTERVAL_COMPLETE if has_tomorrow_data else UPDATE_INTERVAL
should_update = time_since_update >= interval
if should_update:
self._log(
"debug",
"Should update price data: %s (time since last update: %s, interval: %s, has_tomorrow: %s)",
should_update,
time_since_update,
UPDATE_INTERVAL,
interval,
has_tomorrow_data,
)
return should_update
def _needs_tomorrow_data(self, tomorrow_date: date) -> bool:
"""Check if tomorrow data is missing or invalid."""
if not self._cached_price_data or "homes" not in self._cached_price_data:
return False
for home_data in self._cached_price_data["homes"].values():
price_info = home_data.get("price_info", {})
tomorrow_prices = price_info.get("tomorrow", [])
# Check if tomorrow data is missing
if not tomorrow_prices:
return True
# Check if tomorrow data is actually for tomorrow (validate date)
first_price = tomorrow_prices[0]
if starts_at := first_price.get("startsAt"):
price_time = dt_util.parse_datetime(starts_at)
if price_time:
price_date = dt_util.as_local(price_time).date()
if price_date != tomorrow_date:
self._log(
"debug",
"Tomorrow data has wrong date: expected=%s, actual=%s",
tomorrow_date,
price_date,
)
return True
return False
def _has_valid_tomorrow_data(self, tomorrow_date: date) -> bool:
"""Check if we have valid tomorrow data (inverse of _needs_tomorrow_data)."""
return not self._needs_tomorrow_data(tomorrow_date)
@callback
def _merge_cached_data(self) -> dict[str, Any]:
"""Merge cached data into the expected format for main entry."""
@ -445,10 +589,15 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Perform midnight turnover if needed (handles day transitions)
price_info = self._perform_midnight_turnover(price_info)
# Get threshold percentages for enrichment
thresholds = self._get_threshold_percentages()
# Ensure all required keys exist (API might not return tomorrow data yet)
price_info.setdefault("yesterday", [])
price_info.setdefault("today", [])
price_info.setdefault("tomorrow", [])
price_info.setdefault("currency", "EUR")
# Enrich price info with calculated differences (trailing 24h averages)
# Enrich price info dynamically with calculated differences and rating levels
# This ensures enrichment is always up-to-date, especially after midnight turnover
thresholds = self._get_threshold_percentages()
price_info = enrich_price_info_with_differences(
price_info,
threshold_low=thresholds["low"],
@ -481,10 +630,15 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Perform midnight turnover if needed (handles day transitions)
price_info = self._perform_midnight_turnover(price_info)
# Get threshold percentages for enrichment
thresholds = self._get_threshold_percentages()
# Ensure all required keys exist (API might not return tomorrow data yet)
price_info.setdefault("yesterday", [])
price_info.setdefault("today", [])
price_info.setdefault("tomorrow", [])
price_info.setdefault("currency", "EUR")
# Enrich price info with calculated differences (trailing 24h averages)
# Enrich price info dynamically with calculated differences and rating levels
# This ensures enrichment is always up-to-date, especially after midnight turnover
thresholds = self._get_threshold_percentages()
price_info = enrich_price_info_with_differences(
price_info,
threshold_low=thresholds["low"],

View file

@ -19,57 +19,8 @@ class TibberPricesEntity(CoordinatorEntity[TibberPricesDataUpdateCoordinator]):
"""Initialize."""
super().__init__(coordinator)
# Get user profile information from coordinator
user_profile = self.coordinator.get_user_profile()
# Check if this is a main entry or subentry
is_subentry = bool(self.coordinator.config_entry.data.get("home_id"))
# Initialize variables
home_name = "Tibber Home"
home_id = self.coordinator.config_entry.unique_id
home_type = None
if is_subentry:
# For subentries, show specific home information
home_data = self.coordinator.config_entry.data.get("home_data", {})
home_id = self.coordinator.config_entry.data.get("home_id")
# Get home details
address = home_data.get("address", {})
address1 = address.get("address1", "")
city = address.get("city", "")
app_nickname = home_data.get("appNickname", "")
home_type = home_data.get("type", "")
# Compose home name
home_name = app_nickname or address1 or f"Tibber Home {home_id}"
if city:
home_name = f"{home_name}, {city}"
# Add user information if available
if user_profile and user_profile.get("name"):
home_name = f"{home_name} ({user_profile['name']})"
elif user_profile:
# For main entry, show user profile information
user_name = user_profile.get("name", "Tibber User")
user_email = user_profile.get("email", "")
home_name = f"Tibber - {user_name}"
if user_email:
home_name = f"{home_name} ({user_email})"
elif coordinator.data:
# Fallback to original logic if user data not available yet
try:
address1 = str(coordinator.data.get("address", {}).get("address1", ""))
city = str(coordinator.data.get("address", {}).get("city", ""))
app_nickname = str(coordinator.data.get("appNickname", ""))
home_type = str(coordinator.data.get("type", ""))
# Compose a nice name
home_name = "Tibber " + (app_nickname or address1 or "Home")
if city:
home_name = f"{home_name}, {city}"
except (KeyError, IndexError, TypeError):
home_name = "Tibber Home"
# Get device information
home_name, home_id, home_type = self._get_device_info()
# Get translated home type using the configured language
language = coordinator.hass.config.language or "en"
@ -90,3 +41,83 @@ class TibberPricesEntity(CoordinatorEntity[TibberPricesDataUpdateCoordinator]):
serial_number=home_id if home_id else None,
configuration_url="https://developer.tibber.com/explorer",
)
def _get_device_info(self) -> tuple[str, str | None, str | None]:
"""Get device name, ID and type."""
user_profile = self.coordinator.get_user_profile()
is_subentry = bool(self.coordinator.config_entry.data.get("home_id"))
home_id = self.coordinator.config_entry.unique_id
home_type = None
if is_subentry:
home_name, home_id, home_type = self._get_subentry_device_info()
# Add user information if available
if user_profile and user_profile.get("name"):
home_name = f"{home_name} ({user_profile['name']})"
elif user_profile:
home_name = self._get_main_entry_device_info(user_profile)
else:
home_name, home_type = self._get_fallback_device_info()
return home_name, home_id, home_type
def _get_subentry_device_info(self) -> tuple[str, str | None, str | None]:
"""Get device info for subentry."""
home_data = self.coordinator.config_entry.data.get("home_data", {})
home_id = self.coordinator.config_entry.data.get("home_id")
# Get home details
address = home_data.get("address", {})
address1 = address.get("address1", "")
city = address.get("city", "")
app_nickname = home_data.get("appNickname", "")
home_type = home_data.get("type", "")
# Compose home name
if app_nickname and app_nickname.strip():
# If appNickname is set, use it as-is (don't add city)
home_name = app_nickname.strip()
elif address1:
# If no appNickname, use address and optionally add city
home_name = address1
if city:
home_name = f"{home_name}, {city}"
else:
# Fallback to home ID
home_name = f"Tibber Home {home_id}"
return home_name, home_id, home_type
def _get_main_entry_device_info(self, user_profile: dict) -> str:
"""Get device info for main entry."""
user_name = user_profile.get("name", "Tibber User")
user_email = user_profile.get("email", "")
home_name = f"Tibber - {user_name}"
if user_email:
home_name = f"{home_name} ({user_email})"
return home_name
def _get_fallback_device_info(self) -> tuple[str, str | None]:
"""Get fallback device info if user data not available yet."""
if not self.coordinator.data:
return "Tibber Home", None
try:
address1 = str(self.coordinator.data.get("address", {}).get("address1", ""))
city = str(self.coordinator.data.get("address", {}).get("city", ""))
app_nickname = str(self.coordinator.data.get("appNickname", ""))
home_type = str(self.coordinator.data.get("type", ""))
# Compose a nice name
if app_nickname and app_nickname.strip():
home_name = f"Tibber {app_nickname.strip()}"
elif address1:
home_name = f"Tibber {address1}"
if city:
home_name = f"{home_name}, {city}"
else:
home_name = "Tibber Home"
except (KeyError, IndexError, TypeError):
return "Tibber Home", None
else:
return home_name, home_type

View file

@ -1027,7 +1027,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
"interval_start": starts_at.isoformat(),
"interval_end": interval_end.isoformat(),
"price": float(price_data["total"]),
"price_cents": round(float(price_data["total"]) * 100, 2),
"price_minor": round(float(price_data["total"]) * 100, 2),
"level": price_data.get("level", "NORMAL"),
"rating": price_data.get("difference", None),
"rating_level": price_data.get("rating_level"),
@ -1076,7 +1076,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
interval_data = {
"minute": starts_at.minute,
"price": interval["price"],
"price_cents": interval["price_cents"],
"price_minor": interval["price_minor"],
"level": interval["level"], # Price level from priceInfo
"time": starts_at.strftime("%H:%M"),
}
@ -1408,12 +1408,15 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
else:
# Fallback: use the first timestamp of the appropriate day
day_key = "tomorrow" if "tomorrow" in key else "today"
attributes["timestamp"] = price_info.get(day_key, [{}])[0].get("startsAt")
day_data = price_info.get(day_key, [])
if day_data:
attributes["timestamp"] = day_data[0].get("startsAt")
else:
# Fallback: use the first timestamp of the appropriate day
day_key = "tomorrow" if "tomorrow" in key else "today"
first_timestamp = price_info.get(day_key, [{}])[0].get("startsAt")
attributes["timestamp"] = first_timestamp
day_data = price_info.get(day_key, [])
if day_data:
attributes["timestamp"] = day_data[0].get("startsAt")
def _add_average_price_attributes(self, attributes: dict) -> None:
"""Add attributes for trailing and leading average price sensors."""

View file

@ -68,17 +68,35 @@
"options": {
"step": {
"init": {
"description": "Benutzer: {user_login}",
"title": "Allgemeine Einstellungen",
"description": "Konfiguration allgemeiner Einstellungen für Tibber Preisinformationen & Bewertungen.\n\nBenutzer: {user_login}",
"data": {
"access_token": "API-Zugriffstoken",
"extended_descriptions": "Erweiterte Beschreibungen in Entitätsattributen anzeigen",
"best_price_flex": "Flexibilität für Bestpreis (%)",
"peak_price_flex": "Flexibilität für Spitzenpreis (%)",
"price_rating_threshold_low": "Preisbewertungs-Schwellenwert Niedrig (% vs. Durchschnitt)",
"price_rating_threshold_high": "Preisbewertungs-Schwellenwert Hoch (% vs. Durchschnitt)"
"extended_descriptions": "Erweiterte Beschreibungen in Entity-Attributen anzeigen"
}
},
"title": "Optionen für Tibber Preisinformationen & Bewertungen",
"submit": "Optionen speichern"
"price_rating": {
"title": "Preisbewertungs-Schwellwerte",
"description": "Konfiguration der Schwellwerte für Preisbewertungsstufen (NIEDRIG/NORMAL/HOCH) basierend auf dem Vergleich mit dem gleitenden 24-Stunden-Durchschnitt.",
"data": {
"price_rating_threshold_low": "Schwellwert für niedrige Bewertung (% unter gleitendem Durchschnitt)",
"price_rating_threshold_high": "Schwellwert für hohe Bewertung (% über gleitendem Durchschnitt)"
}
},
"best_price": {
"title": "Bestpreis-Periode Einstellungen",
"description": "Konfiguration für den Bestpreis-Periode Binärsensor. Dieser Sensor ist während der Zeiträume mit den niedrigsten Strompreisen aktiv.",
"data": {
"best_price_flex": "Flexibilität: Maximale % über dem Mindestpreis",
"best_price_min_distance_from_avg": "Mindestabstand: Erforderliche % unter dem Tagesdurchschnitt"
}
},
"peak_price": {
"title": "Spitzenpreis-Periode Einstellungen",
"description": "Konfiguration für den Spitzenpreis-Periode Binärsensor. Dieser Sensor ist während der Zeiträume mit den höchsten Strompreisen aktiv.",
"data": {
"peak_price_flex": "Flexibilität: Maximale % unter dem Höchstpreis (negativer Wert)",
"peak_price_min_distance_from_avg": "Mindestabstand: Erforderliche % über dem Tagesdurchschnitt"
}
}
},
"error": {

View file

@ -68,17 +68,35 @@
"options": {
"step": {
"init": {
"description": "User: {user_login}",
"title": "General Settings",
"description": "Configure general settings for Tibber Price Information & Ratings.\n\nUser: {user_login}",
"data": {
"access_token": "API access token",
"extended_descriptions": "Show extended descriptions in entity attributes",
"best_price_flex": "Best Price Flexibility (%)",
"peak_price_flex": "Peak Price Flexibility (%)",
"price_rating_threshold_low": "Price Rating Threshold Low (% vs trailing average)",
"price_rating_threshold_high": "Price Rating Threshold High (% vs trailing average)"
"extended_descriptions": "Show extended descriptions in entity attributes"
}
},
"title": "Options for Tibber Price Information & Ratings",
"submit": "Save Options"
"price_rating": {
"title": "Price Rating Thresholds",
"description": "Configure thresholds for price rating levels (LOW/NORMAL/HIGH) based on comparison with trailing 24-hour average.",
"data": {
"price_rating_threshold_low": "Low Rating Threshold (% below trailing average)",
"price_rating_threshold_high": "High Rating Threshold (% above trailing average)"
}
},
"best_price": {
"title": "Best Price Period Settings",
"description": "Configure settings for the Best Price Period binary sensor. This sensor is active during periods with the lowest electricity prices.",
"data": {
"best_price_flex": "Flexibility: Maximum % above minimum price",
"best_price_min_distance_from_avg": "Minimum Distance: Required % below daily average"
}
},
"peak_price": {
"title": "Peak Price Period Settings",
"description": "Configure settings for the Peak Price Period binary sensor. This sensor is active during periods with the highest electricity prices.",
"data": {
"peak_price_flex": "Flexibility: Maximum % below maximum price (negative value)",
"peak_price_min_distance_from_avg": "Minimum Distance: Required % above daily average"
}
}
},
"error": {

View file

@ -1,22 +1,28 @@
"""Test basic coordinator functionality with the enhanced coordinator."""
"""Test basic coordinator functions."""
from __future__ import annotations
import asyncio # noqa: TC003
from typing import TYPE_CHECKING
from unittest.mock import AsyncMock, Mock, patch
import pytest
from custom_components.tibber_prices.coordinator import (
TibberPricesDataUpdateCoordinator,
)
if TYPE_CHECKING:
from collections.abc import Generator
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
class TestBasicCoordinator:
"""Test basic coordinator functionality."""
"""Test basic coordinator operations."""
@pytest.fixture
def mock_hass(self) -> Mock:
def mock_hass(self, event_loop: asyncio.AbstractEventLoop) -> Mock:
"""Create a mock Home Assistant instance."""
hass = Mock()
hass.data = {}
hass.loop = event_loop
return hass
@pytest.fixture
@ -26,6 +32,7 @@ class TestBasicCoordinator:
config_entry.unique_id = "test_home_123"
config_entry.entry_id = "test_entry"
config_entry.data = {"access_token": "test_token"}
config_entry.title = "Test Home"
return config_entry
@pytest.fixture
@ -36,7 +43,7 @@ class TestBasicCoordinator:
@pytest.fixture
def coordinator(
self, mock_hass: Mock, mock_config_entry: Mock, mock_session: Mock
) -> TibberPricesDataUpdateCoordinator:
) -> Generator[TibberPricesDataUpdateCoordinator]:
"""Create a coordinator instance."""
with (
patch(
@ -50,12 +57,20 @@ class TestBasicCoordinator:
mock_store.async_save = AsyncMock()
mock_store_class.return_value = mock_store
return TibberPricesDataUpdateCoordinator(mock_hass, mock_config_entry)
coord = TibberPricesDataUpdateCoordinator(mock_hass, mock_config_entry)
# Ensure cleanup after test
yield coord
# Clean up the timer
if coord._quarter_hour_timer_cancel: # noqa: SLF001
coord._quarter_hour_timer_cancel() # noqa: SLF001
coord._quarter_hour_timer_cancel = None # noqa: SLF001
def test_coordinator_creation(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test that coordinator can be created."""
assert coordinator is not None # noqa: S101
assert hasattr(coordinator, "get_current_interval_data") # noqa: S101
assert hasattr(coordinator, "get_current_interval") # noqa: S101
assert hasattr(coordinator, "get_all_intervals") # noqa: S101
assert hasattr(coordinator, "get_user_profile") # noqa: S101
@ -76,7 +91,7 @@ class TestBasicCoordinator:
def test_get_current_interval_data_no_data(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test getting current interval data when no data is available."""
current_data = coordinator.get_current_interval_data()
current_data = coordinator.get_current_interval()
assert current_data is None # noqa: S101
def test_get_all_intervals_no_data(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:

View file

@ -3,9 +3,11 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from unittest.mock import AsyncMock, Mock, patch
import pytest
import pytest_asyncio
from custom_components.tibber_prices.api import TibberPricesApiClientCommunicationError
from custom_components.tibber_prices.const import DOMAIN
@ -13,6 +15,9 @@ from custom_components.tibber_prices.coordinator import (
TibberPricesDataUpdateCoordinator,
)
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
class TestEnhancedCoordinator:
"""Test enhanced coordinator functionality."""
@ -24,6 +29,7 @@ class TestEnhancedCoordinator:
config_entry.unique_id = "test_home_id_123"
config_entry.entry_id = "test_entry_id"
config_entry.data = {"access_token": "test_token"}
config_entry.options = {} # Add options dict for threshold lookups
return config_entry
@pytest.fixture
@ -54,10 +60,10 @@ class TestEnhancedCoordinator:
api.async_get_monthly_price_rating = AsyncMock(return_value={"homes": {}})
return api
@pytest.fixture
def coordinator(
@pytest_asyncio.fixture
async def coordinator(
self, mock_hass: Mock, mock_config_entry: Mock, mock_store: Mock, mock_api: Mock
) -> TibberPricesDataUpdateCoordinator:
) -> AsyncGenerator[TibberPricesDataUpdateCoordinator]:
"""Create a coordinator for testing."""
mock_session = Mock()
with (
@ -76,7 +82,12 @@ class TestEnhancedCoordinator:
)
# Replace the API instance with our mock
coordinator.api = mock_api
return coordinator
# Yield for testing
yield coordinator
# Clean up timer on teardown
await coordinator.async_shutdown()
@pytest.mark.asyncio
async def test_main_subentry_pattern(self, mock_hass: Mock, mock_store: Mock) -> None:
@ -86,6 +97,7 @@ class TestEnhancedCoordinator:
main_config_entry.unique_id = "main_home_id"
main_config_entry.entry_id = "main_entry_id"
main_config_entry.data = {"access_token": "test_token"}
main_config_entry.options = {} # Add options dict for threshold lookups
mock_session = Mock()
with (
@ -111,6 +123,7 @@ class TestEnhancedCoordinator:
sub_config_entry.unique_id = "sub_home_id"
sub_config_entry.entry_id = "sub_entry_id"
sub_config_entry.data = {"access_token": "test_token", "home_id": "sub_home_id"}
sub_config_entry.options = {} # Add options dict for threshold lookups
# Set up domain data to simulate main coordinator being already registered
mock_hass.data[DOMAIN] = {"main_entry_id": main_coordinator}
@ -133,6 +146,10 @@ class TestEnhancedCoordinator:
# Verify subentry coordinator is not marked as main entry
assert not sub_coordinator.is_main_entry() # noqa: S101
# Clean up coordinators
await main_coordinator.async_shutdown()
await sub_coordinator.async_shutdown()
@pytest.mark.asyncio
async def test_user_data_functionality(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test user data related functionality."""