From 3d37ace85e58680eb8efb2b18c656e338235a738 Mon Sep 17 00:00:00 2001 From: Julian Pawlowski Date: Tue, 20 May 2025 12:43:12 +0000 Subject: [PATCH] refactoring --- .../tibber_prices/coordinator.py | 313 ++++++++++-------- 1 file changed, 183 insertions(+), 130 deletions(-) diff --git a/custom_components/tibber_prices/coordinator.py b/custom_components/tibber_prices/coordinator.py index 3a5b220..42af80c 100644 --- a/custom_components/tibber_prices/coordinator.py +++ b/custom_components/tibber_prices/coordinator.py @@ -4,7 +4,8 @@ from __future__ import annotations import asyncio import logging -from datetime import datetime, timedelta +import secrets +from datetime import date, datetime, timedelta from typing import TYPE_CHECKING, Any, Final, cast import homeassistant.util.dt as dt_util @@ -14,6 +15,11 @@ from homeassistant.helpers.event import async_track_time_change from homeassistant.helpers.storage import Store from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed +if TYPE_CHECKING: + from collections.abc import Callable + + from .data import TibberPricesConfigEntry + from .api import ( TibberPricesApiClientAuthenticationError, TibberPricesApiClientCommunicationError, @@ -21,9 +27,6 @@ from .api import ( ) from .const import DOMAIN, LOGGER -if TYPE_CHECKING: - from .data import TibberPricesConfigEntry - _LOGGER = logging.getLogger(__name__) PRICE_UPDATE_RANDOM_MIN_HOUR: Final = 13 # Don't check before 13:00 @@ -34,6 +37,8 @@ STORAGE_VERSION: Final = 1 UPDATE_INTERVAL: Final = timedelta(days=1) # Both price and rating data update daily UPDATE_FAILED_MSG: Final = "Update failed" AUTH_FAILED_MSG: Final = "Authentication failed" +MIN_RETRY_INTERVAL: Final = timedelta(minutes=10) +END_OF_DAY_HOUR: Final = 24 # End of day hour for logic clarity @callback @@ -130,6 +135,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): self._remove_update_listeners: list[Any] = [] self._force_update = False self._rotation_lock = asyncio.Lock() # Add lock for data rotation operations + self._last_attempted_price_update: datetime | None = None + self._random_update_minute: int | None = None + self._random_update_date: date | None = None # Schedule updates at the start of every hour self._remove_update_listeners.append( @@ -295,7 +303,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): try: # Fetch price data price_data = await self._fetch_price_data() - new_data["price_data"] = self._extract_price_data(price_data) + new_data["price_data"] = self._extract_data(price_data, "priceInfo", ("yesterday", "today", "tomorrow")) # Fetch all rating data for rating_type in ["hourly", "daily", "monthly"]: @@ -384,56 +392,61 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): @callback def _should_update_price_data(self, current_time: datetime) -> bool: - """Check if price data should be updated.""" - # If no cached data, we definitely need an update - if self._cached_price_data is None: - LOGGER.debug("No cached price data available, update needed") - return True + """ + Decide if price data should be updated. - # Get the latest timestamp from our price data - latest_price_timestamp = _get_latest_timestamp_from_prices(self._cached_price_data) - if not latest_price_timestamp: - LOGGER.debug("No valid timestamp found in price data, update needed") - return True - - # If we have price data but no last_update timestamp, set it - if not self._last_price_update: - self._last_price_update = latest_price_timestamp - LOGGER.debug( - "Setting missing price update timestamp in check: %s", - self._last_price_update, - ) - - # Check if we're in the update window (13:00-15:00) + - No fetch before 13:00. + - Randomized fetch minute in update window (13:00-15:00). + - Always fetch after 15:00 if tomorrow's data is missing. + - No fetch after midnight until 13:00. + """ current_hour = current_time.hour - in_update_window = PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour <= PRICE_UPDATE_RANDOM_MAX_HOUR + # Check if tomorrow's data is available + tomorrow_prices = [] + if self._cached_price_data and "priceInfo" in self._cached_price_data: + tomorrow_prices = self._cached_price_data["priceInfo"].get("tomorrow", []) + interval_count = len(tomorrow_prices) + min_tomorrow_intervals_hourly = 24 + min_tomorrow_intervals_15min = 96 + tomorrow_data_complete = interval_count in {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min} - # Get tomorrow's date at midnight - tomorrow = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) - - # If we're in the update window and don't have tomorrow's complete data - if in_update_window and latest_price_timestamp < tomorrow: - LOGGER.debug( - "In update window (%d:00) and latest price timestamp (%s) is before tomorrow, update needed", - current_hour, - latest_price_timestamp, - ) - return True - - # If it's been more than 24 hours since our last update - if self._last_price_update and current_time - self._last_price_update >= UPDATE_INTERVAL: - LOGGER.debug( - "More than 24 hours since last price update (%s), update needed", - self._last_price_update, - ) - return True - - LOGGER.debug( - "No price update needed - Last update: %s, Latest data: %s", - self._last_price_update, - latest_price_timestamp, - ) - return False + should_update = False + # 1. Before 13:00: never fetch + if current_hour < PRICE_UPDATE_RANDOM_MIN_HOUR: + should_update = False + # 2. In update window (13:00-15:00): fetch at random minute, with min retry interval + elif PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour < PRICE_UPDATE_RANDOM_MAX_HOUR: + today = current_time.date() + if self._random_update_date != today or self._random_update_minute is None: + self._random_update_date = today + self._random_update_minute = secrets.randbelow(RANDOM_DELAY_MAX_MINUTES) + # Only fetch at the random minute + if current_time.minute == self._random_update_minute: + # Enforce minimum retry interval + if self._last_attempted_price_update: + since_last = current_time - self._last_attempted_price_update + if since_last < MIN_RETRY_INTERVAL: + LOGGER.debug( + "Skipping price update: last attempt was %s ago (<%s)", + since_last, + MIN_RETRY_INTERVAL, + ) + should_update = False + else: + self._last_attempted_price_update = current_time + should_update = not tomorrow_data_complete + else: + self._last_attempted_price_update = current_time + should_update = not tomorrow_data_complete + else: + should_update = False + # 3. After update window (15:00-00:00): always fetch if tomorrow's data is missing + elif PRICE_UPDATE_RANDOM_MAX_HOUR <= current_hour < END_OF_DAY_HOUR: + should_update = not tomorrow_data_complete + # 4. After midnight until 13:00: never fetch + else: + should_update = False + return should_update @callback def _should_update_rating_type( @@ -443,69 +456,62 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): last_update: datetime | None, rating_type: str, ) -> bool: - """Check if specific rating type should be updated.""" - # If no cached data, we definitely need an update - if cached_data is None: - LOGGER.debug("No cached %s rating data available, update needed", rating_type) - return True - - # Get the latest timestamp from our rating data - latest_timestamp = self._get_latest_timestamp_from_rating_type(cached_data, rating_type) - if not latest_timestamp: - LOGGER.debug("No valid timestamp found in %s rating data, update needed", rating_type) - return True - - # If we have rating data but no last_update timestamp, set it - if not last_update: - if rating_type == "hourly": - self._last_rating_update_hourly = latest_timestamp - elif rating_type == "daily": - self._last_rating_update_daily = latest_timestamp - else: # monthly - self._last_rating_update_monthly = latest_timestamp - LOGGER.debug( - "Setting missing %s rating timestamp in check: %s", - rating_type, - latest_timestamp, - ) - last_update = latest_timestamp - - current_hour = current_time.hour - in_update_window = PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour <= PRICE_UPDATE_RANDOM_MAX_HOUR - should_update = False + def extra_check_monthly(now: datetime, latest: datetime) -> bool: + current_month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + return latest < current_month_start if rating_type == "monthly": - current_month_start = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - should_update = latest_timestamp < current_month_start or ( - last_update and current_time - last_update >= timedelta(days=1) - ) - else: - tomorrow = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) - should_update = ( - in_update_window and latest_timestamp < tomorrow - ) or current_time - last_update >= UPDATE_INTERVAL - - if should_update: - LOGGER.debug( - "Update needed for %s rating data - Last update: %s, Latest data: %s", - rating_type, + return self._should_update_data( + current_time, + cached_data, last_update, - latest_timestamp, + lambda d: self._get_latest_rating_timestamp(d, rating_type), + config={ + "interval": timedelta(days=1), + "extra_check": extra_check_monthly, + }, ) - else: - LOGGER.debug( - "No %s rating update needed - Last update: %s, Latest data: %s", - rating_type, - last_update, - latest_timestamp, - ) - - return should_update + return self._should_update_data( + current_time, + cached_data, + last_update, + lambda d: self._get_latest_rating_timestamp(d, rating_type), + config={ + "update_window": (PRICE_UPDATE_RANDOM_MIN_HOUR, PRICE_UPDATE_RANDOM_MAX_HOUR), + "interval": UPDATE_INTERVAL, + }, + ) @callback - def _is_price_update_window(self, current_hour: int) -> bool: - """Check if current hour is within price update window.""" - return PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour <= PRICE_UPDATE_RANDOM_MAX_HOUR + def _should_update_data( + self, + current_time: datetime, + cached_data: dict | None, + last_update: datetime | None, + timestamp_func: Callable[[dict | None], datetime | None], + config: dict | None = None, + ) -> bool: + """Generalized update check for any data type.""" + config = config or {} + update_window = config.get("update_window") + interval = config.get("interval", UPDATE_INTERVAL) + extra_check = config.get("extra_check") + if cached_data is None: + return True + latest_timestamp = timestamp_func(cached_data) + if not latest_timestamp: + return True + if not last_update: + last_update = latest_timestamp + if update_window: + current_hour = current_time.hour + if update_window[0] <= current_hour <= update_window[1]: + tomorrow = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) + if latest_timestamp < tomorrow: + return True + if last_update and current_time - last_update >= interval: + return True + return extra_check(current_time, latest_timestamp) if extra_check else False async def _fetch_price_data(self) -> dict: """Fetch fresh price data from API.""" @@ -513,19 +519,15 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): return await client.async_get_price_info() @callback - def _extract_price_data(self, data: dict) -> dict: - """Extract price data for caching in flat format.""" + def _extract_data(self, data: dict, container_key: str, keys: tuple[str, ...]) -> dict: + """Extract and harmonize data for caching in flat format.""" try: - price_info = data["priceInfo"] - extracted_price_info = { - "yesterday": price_info.get("yesterday", []), - "today": price_info.get("today", []), - "tomorrow": price_info.get("tomorrow", []), - } + container = data[container_key] + extracted = {key: list(container.get(key, [])) for key in keys} except (KeyError, IndexError, TypeError) as ex: - LOGGER.error("Error extracting price data: %s", ex) - extracted_price_info = {"yesterday": [], "today": [], "tomorrow": []} - return extracted_price_info + LOGGER.error("Error extracting %s data: %s", container_key, ex) + extracted = {key: [] for key in keys} + return extracted @callback def _get_latest_timestamp_from_rating_type(self, rating_data: dict | None, rating_type: str) -> datetime | None: @@ -550,14 +552,16 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): async def _get_rating_data_for_type(self, rating_type: str) -> dict: """Get fresh rating data for a specific type in flat format.""" client = self.config_entry.runtime_data.client - if rating_type == "hourly": - data = await client.async_get_hourly_price_rating() - elif rating_type == "daily": - data = await client.async_get_daily_price_rating() - else: - data = await client.async_get_monthly_price_rating() - - # Accept both {"priceRating": {...}} and flat {rating_type: [...], ...} dicts + method_map = { + "hourly": client.async_get_hourly_price_rating, + "daily": client.async_get_daily_price_rating, + "monthly": client.async_get_monthly_price_rating, + } + fetch_method = method_map.get(rating_type) + if not fetch_method: + msg = f"Unknown rating type: {rating_type}" + raise ValueError(msg) + data = await fetch_method() try: price_rating = data.get("priceRating", data) threshold = price_rating.get("thresholdPercentages") @@ -772,9 +776,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): return None if rating_type: - timestamp = self._get_latest_timestamp_from_rating_type(data, rating_type) + timestamp = self._get_latest_rating_timestamp(data, rating_type) else: - timestamp = _get_latest_timestamp_from_prices(data) + timestamp = self._get_latest_price_timestamp(data) if timestamp: LOGGER.debug( @@ -786,3 +790,52 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): return None return timestamp + + @callback + def _get_latest_timestamp( + self, + data: dict | None, + container_key: str, + entry_key: str | None = None, + time_field: str = "startsAt", + ) -> datetime | None: + """Get the latest timestamp from a container in data, optionally for a subkey and time field.""" + if not data or container_key not in data: + return None + try: + container = data[container_key] + if entry_key: + container = container.get(entry_key, []) + latest = None + for entry in container: + time_str = entry.get(time_field) + if time_str: + timestamp = dt_util.parse_datetime(time_str) + if timestamp and (not latest or timestamp > latest): + latest = timestamp + except (KeyError, IndexError, TypeError): + return None + return latest + + @callback + def _get_latest_price_timestamp(self, price_data: dict | None) -> datetime | None: + """Get the latest timestamp from price data (today and tomorrow).""" + # Check both today and tomorrow, return the latest + today = self._get_latest_timestamp(price_data, "priceInfo", "today", "startsAt") + tomorrow = self._get_latest_timestamp(price_data, "priceInfo", "tomorrow", "startsAt") + if today and tomorrow: + return max(today, tomorrow) + return today or tomorrow + + @callback + def _get_latest_rating_timestamp(self, rating_data: dict | None, rating_type: str | None = None) -> datetime | None: + """Get the latest timestamp from rating data, optionally for a specific type.""" + if not rating_type: + # Check all types and return the latest + latest = None + for rtype in ("hourly", "daily", "monthly"): + ts = self._get_latest_timestamp(rating_data, "priceRating", rtype, "time") + if ts and (not latest or ts > latest): + latest = ts + return latest + return self._get_latest_timestamp(rating_data, "priceRating", rating_type, "time")