refactoring

This commit is contained in:
Julian Pawlowski 2025-04-21 18:07:50 +00:00
parent 385ea6c1de
commit 5150ae3f48
3 changed files with 780 additions and 184 deletions

View file

@ -67,9 +67,13 @@ async def async_unload_entry(
hass: HomeAssistant, hass: HomeAssistant,
entry: TibberPricesConfigEntry, entry: TibberPricesConfigEntry,
) -> bool: ) -> bool:
"""Handle unload of an entry.""" """Unload a config entry."""
await entry.runtime_data.coordinator.shutdown() unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok and entry.runtime_data is not None:
await entry.runtime_data.coordinator.async_shutdown()
return unload_ok
async def async_remove_entry( async def async_remove_entry(

View file

@ -108,7 +108,19 @@ async def _verify_graphql_response(response_json: dict) -> None:
def _is_data_empty(data: dict, query_type: str) -> bool: def _is_data_empty(data: dict, query_type: str) -> bool:
"""Check if the response data is empty or incomplete.""" """Check if the response data is empty or incomplete.
For price info:
- Must have either range/edges or yesterday data
- Must have today data
- If neither range/edges nor yesterday data exists, data is considered empty
- If today data is empty, data is considered empty
- tomorrow can be empty if we have valid historical and today data
For rating data:
- Must have thresholdPercentages
- Must have non-empty entries for the specific rating type
"""
_LOGGER.debug("Checking if data is empty for query_type %s", query_type) _LOGGER.debug("Checking if data is empty for query_type %s", query_type)
try: try:
@ -116,30 +128,69 @@ def _is_data_empty(data: dict, query_type: str) -> bool:
if query_type == "price_info": if query_type == "price_info":
price_info = subscription["priceInfo"] price_info = subscription["priceInfo"]
# Check either range or yesterday, since we transform range into yesterday
has_range = "range" in price_info and price_info["range"]["edges"] # Check historical data (either range or yesterday)
has_yesterday = "yesterday" in price_info and price_info["yesterday"] has_range = (
"range" in price_info
and price_info["range"] is not None
and "edges" in price_info["range"]
and price_info["range"]["edges"]
)
has_yesterday = (
"yesterday" in price_info
and price_info["yesterday"] is not None
and len(price_info["yesterday"]) > 0
)
has_historical = has_range or has_yesterday has_historical = has_range or has_yesterday
is_empty = not has_historical or not price_info["today"]
# Check today's data
has_today = (
"today" in price_info
and price_info["today"] is not None
and len(price_info["today"]) > 0
)
# Data is empty if we don't have historical data or today's data
is_empty = not has_historical or not has_today
_LOGGER.debug( _LOGGER.debug(
"Price info check - historical data: %s, today: %s, is_empty: %s", "Price info check - historical data (range: %s, yesterday: %s), today: %s, is_empty: %s",
bool(has_historical), bool(has_range),
bool(price_info["today"]), bool(has_yesterday),
bool(has_today),
is_empty, is_empty,
) )
return is_empty return is_empty
if query_type in ["daily", "hourly", "monthly"]: if query_type in ["daily", "hourly", "monthly"]:
rating = subscription["priceRating"] rating = subscription["priceRating"]
if not rating["thresholdPercentages"]:
_LOGGER.debug("Missing threshold percentages for %s rating", query_type) # Check threshold percentages
has_thresholds = (
"thresholdPercentages" in rating
and rating["thresholdPercentages"] is not None
and "low" in rating["thresholdPercentages"]
and "high" in rating["thresholdPercentages"]
)
if not has_thresholds:
_LOGGER.debug("Missing or invalid threshold percentages for %s rating", query_type)
return True return True
entries = rating[query_type]["entries"]
is_empty = not entries or len(entries) == 0 # Check rating entries
has_entries = (
query_type in rating
and rating[query_type] is not None
and "entries" in rating[query_type]
and rating[query_type]["entries"] is not None
and len(rating[query_type]["entries"]) > 0
)
is_empty = not has_entries
_LOGGER.debug( _LOGGER.debug(
"%s rating check - entries count: %d, is_empty: %s", "%s rating check - has_thresholds: %s, entries count: %d, is_empty: %s",
query_type, query_type,
len(entries) if entries else 0, has_thresholds,
len(rating[query_type]["entries"]) if has_entries else 0,
is_empty, is_empty,
) )
return is_empty return is_empty
@ -149,8 +200,6 @@ def _is_data_empty(data: dict, query_type: str) -> bool:
except (KeyError, IndexError, TypeError) as error: except (KeyError, IndexError, TypeError) as error:
_LOGGER.debug("Error checking data emptiness: %s", error) _LOGGER.debug("Error checking data emptiness: %s", error)
return True return True
else:
return False
def _prepare_headers(access_token: str) -> dict[str, str]: def _prepare_headers(access_token: str) -> dict[str, str]:

View file

@ -1,47 +1,139 @@
"""DataUpdateCoordinator for tibber_prices.""" """Coordinator for fetching Tibber price data."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import random import random
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, cast import logging
from typing import TYPE_CHECKING, Any, Final, TypedDict, cast
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryAuthFailed from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.storage import Store from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import ( from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
DataUpdateCoordinator, import homeassistant.util.dt as dt_util
UpdateFailed,
)
from homeassistant.util import dt as dt_util
from .api import ( from .api import (
TibberPricesApiClientAuthenticationError, TibberPricesApiClient,
TibberPricesApiClientError, TibberPricesApiClientError,
TibberPricesApiClientAuthenticationError,
TibberPricesApiClientCommunicationError,
)
from .const import (
DOMAIN,
LOGGER,
) )
from .const import DOMAIN, LOGGER
if TYPE_CHECKING: if TYPE_CHECKING:
from homeassistant.core import HomeAssistant
from .data import TibberPricesConfigEntry from .data import TibberPricesConfigEntry
_LOGGER = logging.getLogger(__name__)
PRICE_UPDATE_RANDOM_MIN_HOUR = 13 # Don't check before 13:00 PRICE_UPDATE_RANDOM_MIN_HOUR: Final = 13 # Don't check before 13:00
PRICE_UPDATE_RANDOM_MAX_HOUR = 15 # Don't check after 15:00 PRICE_UPDATE_RANDOM_MAX_HOUR: Final = 15 # Don't check after 15:00
PRICE_UPDATE_INTERVAL = timedelta(days=1) RANDOM_DELAY_MAX_MINUTES: Final = 120 # Maximum random delay in minutes
RATING_UPDATE_INTERVAL = timedelta(hours=1) NO_DATA_ERROR_MSG: Final = "No data available"
NO_DATA_ERROR_MSG = "No data available" STORAGE_VERSION: Final = 1
STORAGE_VERSION = 1 UPDATE_INTERVAL: Final = timedelta(days=1) # Both price and rating data update daily
class TibberPricesPriceInfo(TypedDict):
"""Type for price info data structure."""
today: list[dict[str, Any]]
tomorrow: list[dict[str, Any]]
yesterday: list[dict[str, Any]]
class TibberPricesPriceRating(TypedDict):
"""Type for price rating data structure."""
thresholdPercentages: dict[str, float] | None
hourly: dict[str, Any] | None
daily: dict[str, Any] | None
monthly: dict[str, Any] | None
class TibberPricesSubscriptionData(TypedDict):
"""Type for price info data structure."""
priceInfo: TibberPricesPriceInfo
priceRating: TibberPricesPriceRating
class TibberPricesData(TypedDict):
"""Type for Tibber API response data structure."""
data: dict[str, dict[str, list[dict[str, TibberPricesSubscriptionData]]]]
@callback
def _raise_no_data() -> None: def _raise_no_data() -> None:
"""Raise error when no data is available.""" """Raise error when no data is available."""
raise TibberPricesApiClientError(NO_DATA_ERROR_MSG) raise TibberPricesApiClientError(NO_DATA_ERROR_MSG)
@callback
def _get_latest_timestamp_from_prices(price_data: TibberPricesData | None) -> datetime | None:
"""Get the latest timestamp from price data."""
if not price_data or "data" not in price_data:
return None
try:
subscription = price_data["data"]["viewer"]["homes"][0]["currentSubscription"]
price_info = subscription["priceInfo"]
latest_timestamp = None
# Check today's prices
if today_prices := price_info.get("today"):
for price in today_prices:
if starts_at := price.get("startsAt"):
timestamp = dt_util.parse_datetime(starts_at)
if timestamp and (not latest_timestamp or timestamp > latest_timestamp):
latest_timestamp = timestamp
# Check tomorrow's prices
if tomorrow_prices := price_info.get("tomorrow"):
for price in tomorrow_prices:
if starts_at := price.get("startsAt"):
timestamp = dt_util.parse_datetime(starts_at)
if timestamp and (not latest_timestamp or timestamp > latest_timestamp):
latest_timestamp = timestamp
return latest_timestamp
except (KeyError, IndexError, TypeError):
return None
@callback
def _get_latest_timestamp_from_rating(rating_data: TibberPricesData | None) -> datetime | None:
"""Get the latest timestamp from rating data."""
if not rating_data or "data" not in rating_data:
return None
try:
subscription = rating_data["data"]["viewer"]["homes"][0]["currentSubscription"]
price_rating = subscription["priceRating"]
latest_timestamp = None
# Check all rating types (hourly, daily, monthly)
for rating_type in ["hourly", "daily", "monthly"]:
if rating_entries := price_rating.get(rating_type, {}).get("entries", []):
for entry in rating_entries:
if time := entry.get("time"):
timestamp = dt_util.parse_datetime(time)
if timestamp and (not latest_timestamp or timestamp > latest_timestamp):
latest_timestamp = timestamp
return latest_timestamp
except (KeyError, IndexError, TypeError):
return None
# https://developers.home-assistant.io/docs/integration_fetching_data#coordinated-single-api-poll-for-data-for-all-entities # https://developers.home-assistant.io/docs/integration_fetching_data#coordinated-single-api-poll-for-data-for-all-entities
class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator): class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]):
"""Class to manage fetching data from the API.""" """Class to manage fetching data from the API."""
def __init__( def __init__(
@ -56,142 +148,505 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator):
self.config_entry = entry self.config_entry = entry
storage_key = f"{DOMAIN}.{entry.entry_id}" storage_key = f"{DOMAIN}.{entry.entry_id}"
self._store = Store(hass, STORAGE_VERSION, storage_key) self._store = Store(hass, STORAGE_VERSION, storage_key)
self._cached_price_data: dict | None = None self._cached_price_data: TibberPricesData | None = None
self._cached_rating_data: dict | None = None self._cached_rating_data_hourly: TibberPricesData | None = None
self._cached_rating_data_daily: TibberPricesData | None = None
self._cached_rating_data_monthly: TibberPricesData | None = None
self._last_price_update: datetime | None = None self._last_price_update: datetime | None = None
self._last_rating_update: datetime | None = None self._last_rating_update_hourly: datetime | None = None
self._last_rating_update_daily: datetime | None = None
self._last_rating_update_monthly: datetime | None = None
self._scheduled_price_update: asyncio.Task | None = None self._scheduled_price_update: asyncio.Task | None = None
self._remove_update_listener = None self._remove_update_listeners: list[Any] = []
self._force_update = False
# Schedule additional updates at the start of every hour # Schedule updates at the start of every hour
self._remove_update_listener = hass.helpers.event.async_track_time_change( self._remove_update_listeners.append(
self.async_request_refresh, minute=0, second=0 hass.helpers.event.async_track_time_change(
self._async_refresh_hourly, minute=0, second=0
)
) )
async def shutdown(self) -> None: # Schedule data rotation at midnight
self._remove_update_listeners.append(
hass.helpers.event.async_track_time_change(
self._async_handle_midnight_rotation, hour=0, minute=0, second=0
)
)
async def async_shutdown(self) -> None:
"""Clean up coordinator on shutdown.""" """Clean up coordinator on shutdown."""
if self._remove_update_listener: await super().async_shutdown()
self._remove_update_listener() for listener in self._remove_update_listeners:
listener()
async def _async_handle_midnight_rotation(self, _now: datetime | None = None) -> None:
"""Handle data rotation at midnight."""
if not self._cached_price_data:
return
try:
LOGGER.debug("Starting midnight data rotation")
subscription = self._cached_price_data["data"]["viewer"]["homes"][0]["currentSubscription"]
price_info = subscription["priceInfo"]
# Move today's data to yesterday
if today_data := price_info.get("today"):
price_info["yesterday"] = today_data
# Move tomorrow's data to today
if tomorrow_data := price_info.get("tomorrow"):
price_info["today"] = tomorrow_data
price_info["tomorrow"] = []
else:
price_info["today"] = []
# Store the rotated data
await self._store_cache()
LOGGER.debug("Completed midnight data rotation")
# Trigger an update to refresh the entities
await self.async_request_refresh()
except Exception as ex:
LOGGER.error("Error during midnight data rotation: %s", ex)
async def _async_initialize(self) -> None: async def _async_initialize(self) -> None:
"""Load stored data.""" """Load stored data."""
stored = await self._store.async_load() stored = await self._store.async_load()
LOGGER.debug("Loading stored data: %s", stored)
if stored: if stored:
self._cached_price_data = stored.get("price_data") # Load cached data
self._cached_rating_data = stored.get("rating_data") self._cached_price_data = cast(TibberPricesData, stored.get("price_data"))
self._cached_rating_data_hourly = cast(TibberPricesData, stored.get("rating_data_hourly"))
self._cached_rating_data_daily = cast(TibberPricesData, stored.get("rating_data_daily"))
self._cached_rating_data_monthly = cast(TibberPricesData, stored.get("rating_data_monthly"))
# Get timestamps from the actual data first
latest_price_timestamp = None
latest_hourly_timestamp = None
latest_daily_timestamp = None
latest_monthly_timestamp = None
if self._cached_price_data:
latest_price_timestamp = _get_latest_timestamp_from_prices(self._cached_price_data)
if latest_price_timestamp and not stored.get("last_price_update"):
self._last_price_update = latest_price_timestamp
LOGGER.debug("Recovered price update timestamp from data: %s", self._last_price_update)
if self._cached_rating_data_hourly:
latest_hourly_timestamp = self._get_latest_timestamp_from_rating_type(
self._cached_rating_data_hourly, "hourly"
)
if latest_hourly_timestamp and not stored.get("last_rating_update_hourly"):
self._last_rating_update_hourly = latest_hourly_timestamp
LOGGER.debug("Recovered hourly rating timestamp from data: %s", self._last_rating_update_hourly)
if self._cached_rating_data_daily:
latest_daily_timestamp = self._get_latest_timestamp_from_rating_type(
self._cached_rating_data_daily, "daily"
)
if latest_daily_timestamp and not stored.get("last_rating_update_daily"):
self._last_rating_update_daily = latest_daily_timestamp
LOGGER.debug("Recovered daily rating timestamp from data: %s", self._last_rating_update_daily)
if self._cached_rating_data_monthly:
latest_monthly_timestamp = self._get_latest_timestamp_from_rating_type(
self._cached_rating_data_monthly, "monthly"
)
if latest_monthly_timestamp and not stored.get("last_rating_update_monthly"):
self._last_rating_update_monthly = latest_monthly_timestamp
LOGGER.debug("Recovered monthly rating timestamp from data: %s", self._last_rating_update_monthly)
# Then load stored timestamps if they exist
if last_price := stored.get("last_price_update"): if last_price := stored.get("last_price_update"):
self._last_price_update = dt_util.parse_datetime(last_price) self._last_price_update = dt_util.parse_datetime(last_price)
if last_rating := stored.get("last_rating_update"): if last_rating_hourly := stored.get("last_rating_update_hourly"):
self._last_rating_update = dt_util.parse_datetime(last_rating) self._last_rating_update_hourly = dt_util.parse_datetime(last_rating_hourly)
if last_rating_daily := stored.get("last_rating_update_daily"):
self._last_rating_update_daily = dt_util.parse_datetime(last_rating_daily)
if last_rating_monthly := stored.get("last_rating_update_monthly"):
self._last_rating_update_monthly = dt_util.parse_datetime(last_rating_monthly)
LOGGER.debug( LOGGER.debug(
"Loaded stored cache data - Price from: %s, Rating from: %s", "Loaded stored cache data - "
"Price update: %s (latest data: %s), "
"Rating hourly: %s (latest data: %s), "
"daily: %s (latest data: %s), "
"monthly: %s (latest data: %s)",
self._last_price_update, self._last_price_update,
self._last_rating_update, latest_price_timestamp,
self._last_rating_update_hourly,
latest_hourly_timestamp,
self._last_rating_update_daily,
latest_daily_timestamp,
self._last_rating_update_monthly,
latest_monthly_timestamp,
) )
async def _async_update_data(self) -> Any: async def _async_refresh_hourly(self, *_: Any) -> None:
"""Update data via library.""" """Handle the hourly refresh - don't force update."""
await self.async_refresh()
async def _async_update_data(self) -> TibberPricesData:
"""Fetch new state data for the coordinator.
This method will:
1. Initialize cached data if none exists
2. Force update if requested via async_request_refresh
3. Otherwise, check if update conditions are met
4. Use cached data as fallback if API call fails
"""
if self._cached_price_data is None: if self._cached_price_data is None:
# First run after startup, load stored data
await self._async_initialize() await self._async_initialize()
try: try:
data = await self._update_all_data()
except TibberPricesApiClientAuthenticationError as exception:
raise ConfigEntryAuthFailed(exception) from exception
except TibberPricesApiClientError as exception:
raise UpdateFailed(exception) from exception
else:
return data
async def _update_all_data(self) -> dict[str, Any]:
"""Update all data and manage cache."""
current_time = dt_util.now() current_time = dt_util.now()
processed_data: dict[str, Any] | None = None
is_initial_setup = self._cached_price_data is None
# Handle price data update if needed # If force update requested, fetch all data
if self._should_update_price_data(current_time): if self._force_update:
# Check if we're within the allowed time window for price updates
# or if this is initial setup
current_hour = current_time.hour
if is_initial_setup or self._is_price_update_window(current_hour):
# Add random delay only for regular updates, not initial setup
if not is_initial_setup:
delay = random.randint(0, 120) # noqa: S311
LOGGER.debug( LOGGER.debug(
"Adding random delay of %d minutes before price update", "Force updating data",
delay, extra={
"reason": "force_update",
"last_success": self.last_update_success,
"last_price_update": self._last_price_update,
"last_rating_updates": {
"hourly": self._last_rating_update_hourly,
"daily": self._last_rating_update_daily,
"monthly": self._last_rating_update_monthly,
}
}
) )
await asyncio.sleep(delay * 60) self._force_update = False # Reset force update flag
return await self._fetch_all_data()
# Get fresh price data # Check if we need to update based on conditions
data = await self._fetch_price_data() should_update_price = self._should_update_price_data(current_time)
self._cached_price_data = self._extract_price_data(data) should_update_hourly = self._should_update_rating_type(
self._last_price_update = current_time current_time, self._cached_rating_data_hourly, self._last_rating_update_hourly, "hourly"
await self._store_cache() )
LOGGER.debug("Updated price data cache at %s", current_time) should_update_daily = self._should_update_rating_type(
processed_data = data current_time, self._cached_rating_data_daily, self._last_rating_update_daily, "daily"
)
should_update_monthly = self._should_update_rating_type(
current_time, self._cached_rating_data_monthly, self._last_rating_update_monthly, "monthly"
)
# Handle rating data update if needed if any([should_update_price, should_update_hourly, should_update_daily, should_update_monthly]):
if self._should_update_rating_data(current_time):
rating_data = await self._get_rating_data()
self._cached_rating_data = self._extract_rating_data(rating_data)
self._last_rating_update = current_time
await self._store_cache()
LOGGER.debug("Updated rating data cache at %s", current_time)
processed_data = rating_data
# If we have cached data but no updates were needed
if (
processed_data is None
and self._cached_price_data
and self._cached_rating_data
):
LOGGER.debug( LOGGER.debug(
"Using cached data - Price from: %s, Rating from: %s", "Updating data based on conditions",
self._last_price_update, extra={
self._last_rating_update, "update_price": should_update_price,
"update_hourly": should_update_hourly,
"update_daily": should_update_daily,
"update_monthly": should_update_monthly,
}
) )
processed_data = self._merge_cached_data() return await self._fetch_all_data()
if processed_data is None: # Use cached data if no update needed
if self._cached_price_data is not None:
LOGGER.debug("Using cached data")
return self._merge_all_cached_data()
# If we have no cached data and no updates needed, fetch new data
LOGGER.debug("No cached data available, fetching new data")
return await self._fetch_all_data()
except TibberPricesApiClientAuthenticationError as exception:
LOGGER.error(
"Authentication failed",
extra={"error": str(exception), "error_type": "auth_failed"}
)
raise ConfigEntryAuthFailed("Authentication failed while fetching data") from exception
except TibberPricesApiClientCommunicationError as exception:
LOGGER.error(
"API communication error",
extra={"error": str(exception), "error_type": "communication_error"}
)
if self._cached_price_data is not None:
LOGGER.info("Using cached data as fallback")
return self._merge_all_cached_data()
raise UpdateFailed(f"Error communicating with API: {exception}") from exception
except TibberPricesApiClientError as exception:
LOGGER.error(
"API client error",
extra={"error": str(exception), "error_type": "client_error"}
)
if self._cached_price_data is not None:
LOGGER.info("Using cached data as fallback")
return self._merge_all_cached_data()
raise UpdateFailed(f"Error fetching data: {exception}") from exception
except Exception as exception:
LOGGER.exception(
"Unexpected error",
extra={"error": str(exception), "error_type": "unexpected"}
)
if self._cached_price_data is not None:
LOGGER.info("Using cached data as fallback")
return self._merge_all_cached_data()
raise UpdateFailed(f"Unexpected error: {exception}") from exception
async def _fetch_all_data(self) -> TibberPricesData:
"""Fetch all data from the API without checking update conditions.
This method will:
1. Fetch all required data (price and rating data)
2. Validate that all data is complete and valid
3. Only then update the cache with the new data
4. If any data is invalid, keep using the cached data
"""
current_time = dt_util.now()
new_data = {
"price_data": None,
"rating_data": {
"hourly": None,
"daily": None,
"monthly": None
}
}
# First fetch all data without updating cache
try:
# Fetch price data
price_data = await self._fetch_price_data()
new_data["price_data"] = self._extract_price_data(price_data)
# Fetch all rating data
for rating_type in ["hourly", "daily", "monthly"]:
try:
rating_data = await self._get_rating_data_for_type(rating_type)
new_data["rating_data"][rating_type] = rating_data
except TibberPricesApiClientError as ex:
LOGGER.error("Failed to fetch %s rating data: %s", rating_type, ex)
# Don't raise here, we'll check completeness later
except TibberPricesApiClientError as ex:
LOGGER.error("Failed to fetch price data: %s", ex)
if self._cached_price_data is not None:
LOGGER.info("Using cached data as fallback after price data fetch failure")
return self._merge_all_cached_data()
raise
# Validate that we have all required data
if new_data["price_data"] is None:
LOGGER.error("No price data available after fetch")
if self._cached_price_data is not None:
LOGGER.info("Using cached data as fallback due to missing price data")
return self._merge_all_cached_data()
_raise_no_data() _raise_no_data()
return cast("dict[str, Any]", processed_data) # Only update cache if we have valid data
self._cached_price_data = cast(TibberPricesData, new_data["price_data"])
self._last_price_update = current_time
# Update rating data cache only for types that were successfully fetched
for rating_type, rating_data in new_data["rating_data"].items():
if rating_data is not None:
if rating_type == "hourly":
self._cached_rating_data_hourly = cast(TibberPricesData, rating_data)
self._last_rating_update_hourly = current_time
elif rating_type == "daily":
self._cached_rating_data_daily = cast(TibberPricesData, rating_data)
self._last_rating_update_daily = current_time
else: # monthly
self._cached_rating_data_monthly = cast(TibberPricesData, rating_data)
self._last_rating_update_monthly = current_time
LOGGER.debug("Updated %s rating data cache at %s", rating_type, current_time)
# Store the updated cache
await self._store_cache()
LOGGER.debug("Updated and stored all cache data at %s", current_time)
# Return merged data
return self._merge_all_cached_data()
async def _store_cache(self) -> None: async def _store_cache(self) -> None:
"""Store cache data.""" """Store cache data."""
last_price = ( # Recover any missing timestamps from the data
self._last_price_update.isoformat() if self._last_price_update else None if self._cached_price_data and not self._last_price_update:
) latest_timestamp = _get_latest_timestamp_from_prices(self._cached_price_data)
last_rating = ( if latest_timestamp:
self._last_rating_update.isoformat() if self._last_rating_update else None self._last_price_update = latest_timestamp
) LOGGER.debug("Setting missing price update timestamp to: %s", self._last_price_update)
rating_types = {
"hourly": (self._cached_rating_data_hourly, self._last_rating_update_hourly),
"daily": (self._cached_rating_data_daily, self._last_rating_update_daily),
"monthly": (self._cached_rating_data_monthly, self._last_rating_update_monthly),
}
for rating_type, (cached_data, last_update) in rating_types.items():
if cached_data and not last_update:
latest_timestamp = self._get_latest_timestamp_from_rating_type(cached_data, rating_type)
if latest_timestamp:
if rating_type == "hourly":
self._last_rating_update_hourly = latest_timestamp
elif rating_type == "daily":
self._last_rating_update_daily = latest_timestamp
else: # monthly
self._last_rating_update_monthly = latest_timestamp
LOGGER.debug("Setting missing %s rating timestamp to: %s", rating_type, latest_timestamp)
data = { data = {
"price_data": self._cached_price_data, "price_data": self._cached_price_data,
"rating_data": self._cached_rating_data, "rating_data_hourly": self._cached_rating_data_hourly,
"last_price_update": last_price, "rating_data_daily": self._cached_rating_data_daily,
"last_rating_update": last_rating, "rating_data_monthly": self._cached_rating_data_monthly,
"last_price_update": self._last_price_update.isoformat() if self._last_price_update else None,
"last_rating_update_hourly": self._last_rating_update_hourly.isoformat() if self._last_rating_update_hourly else None,
"last_rating_update_daily": self._last_rating_update_daily.isoformat() if self._last_rating_update_daily else None,
"last_rating_update_monthly": self._last_rating_update_monthly.isoformat() if self._last_rating_update_monthly else None,
} }
LOGGER.debug("Storing cache data with timestamps: %s", {
k: v for k, v in data.items() if k.startswith("last_")
})
await self._store.async_save(data) await self._store.async_save(data)
@callback
def _should_update_price_data(self, current_time: datetime) -> bool: def _should_update_price_data(self, current_time: datetime) -> bool:
"""Check if price data should be updated.""" """Check if price data should be updated."""
return ( # If no cached data, we definitely need an update
self._cached_price_data is None if self._cached_price_data is None:
or self._last_price_update is None LOGGER.debug("No cached price data available, update needed")
or current_time - self._last_price_update >= PRICE_UPDATE_INTERVAL return True
# Get the latest timestamp from our price data
latest_price_timestamp = _get_latest_timestamp_from_prices(self._cached_price_data)
if not latest_price_timestamp:
LOGGER.debug("No valid timestamp found in price data, update needed")
return True
# If we have price data but no last_update timestamp, set it
if not self._last_price_update:
self._last_price_update = latest_price_timestamp
LOGGER.debug("Setting missing price update timestamp in check: %s", self._last_price_update)
# Check if we're in the update window (13:00-15:00)
current_hour = current_time.hour
in_update_window = PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour <= PRICE_UPDATE_RANDOM_MAX_HOUR
# Get tomorrow's date at midnight
tomorrow = (current_time + timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
) )
def _should_update_rating_data(self, current_time: datetime) -> bool: # If we're in the update window and don't have tomorrow's complete data
"""Check if rating data should be updated.""" if in_update_window and latest_price_timestamp < tomorrow:
return ( LOGGER.debug(
self._cached_rating_data is None "In update window (%d:00) and latest price timestamp (%s) is before tomorrow, update needed",
or self._last_rating_update is None current_hour,
or current_time - self._last_rating_update >= RATING_UPDATE_INTERVAL latest_price_timestamp,
)
return True
# If it's been more than 24 hours since our last update
if (
self._last_price_update
and current_time - self._last_price_update >= UPDATE_INTERVAL
):
LOGGER.debug(
"More than 24 hours since last price update (%s), update needed",
self._last_price_update,
)
return True
LOGGER.debug(
"No price update needed - Last update: %s, Latest data: %s",
self._last_price_update,
latest_price_timestamp,
)
return False
@callback
def _should_update_rating_type(
self,
current_time: datetime,
cached_data: TibberPricesData | None,
last_update: datetime | None,
rating_type: str,
) -> bool:
"""Check if specific rating type should be updated."""
# If no cached data, we definitely need an update
if cached_data is None:
LOGGER.debug("No cached %s rating data available, update needed", rating_type)
return True
# Get the latest timestamp from our rating data
latest_timestamp = self._get_latest_timestamp_from_rating_type(cached_data, rating_type)
if not latest_timestamp:
LOGGER.debug("No valid timestamp found in %s rating data, update needed", rating_type)
return True
# If we have rating data but no last_update timestamp, set it
if not last_update:
if rating_type == "hourly":
self._last_rating_update_hourly = latest_timestamp
elif rating_type == "daily":
self._last_rating_update_daily = latest_timestamp
else: # monthly
self._last_rating_update_monthly = latest_timestamp
LOGGER.debug("Setting missing %s rating timestamp in check: %s", rating_type, latest_timestamp)
last_update = latest_timestamp
current_hour = current_time.hour
in_update_window = PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour <= PRICE_UPDATE_RANDOM_MAX_HOUR
if rating_type == "monthly":
# For monthly ratings:
# 1. Check if we have data for the current month
# 2. Update more frequently as the data changes throughout the month
current_month_start = current_time.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
if latest_timestamp < current_month_start:
LOGGER.debug(
"Monthly rating data is from previous month (%s), update needed",
latest_timestamp,
)
return True
# Update monthly data daily to get updated calculations
if last_update and current_time - last_update >= timedelta(days=1):
LOGGER.debug(
"More than 24 hours since last monthly rating update (%s), update needed for latest calculations",
last_update,
)
return True
else:
# For hourly and daily ratings:
# Get tomorrow's date at midnight
tomorrow = (current_time + timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
) )
# If we're in the update window and don't have tomorrow's data
if in_update_window and latest_timestamp < tomorrow:
LOGGER.debug(
"In update window and %s rating data (%s) is before tomorrow, update needed",
rating_type,
latest_timestamp,
)
return True
# If it's been more than 24 hours since our last update
if current_time - last_update >= UPDATE_INTERVAL:
LOGGER.debug(
"More than 24 hours since last %s rating update (%s), update needed",
rating_type,
last_update,
)
return True
LOGGER.debug(
"No %s rating update needed - Last update: %s, Latest data: %s",
rating_type,
last_update,
latest_timestamp,
)
return False
@callback
def _is_price_update_window(self, current_hour: int) -> bool: def _is_price_update_window(self, current_hour: int) -> bool:
"""Check if current hour is within price update window.""" """Check if current hour is within price update window."""
return ( return (
@ -203,59 +658,103 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator):
client = self.config_entry.runtime_data.client client = self.config_entry.runtime_data.client
return await client.async_get_price_info() return await client.async_get_price_info()
@callback
def _extract_price_data(self, data: dict) -> dict: def _extract_price_data(self, data: dict) -> dict:
"""Extract price data for caching.""" """Extract price data for caching."""
try:
# Try to access data in the transformed structure first
try:
price_info = data["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] price_info = data["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
return { except KeyError:
"data": { # If that fails, try the raw data structure
"viewer": { price_info = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
"homes": [{"currentSubscription": {"priceInfo": price_info}}]
} # Ensure we have all required fields
} extracted_price_info = {
"today": price_info.get("today", []),
"tomorrow": price_info.get("tomorrow", []),
"yesterday": price_info.get("yesterday", []),
} }
def _extract_rating_data(self, data: dict) -> dict:
"""Extract rating data for caching."""
return { return {
"data": { "data": {
"viewer": { "viewer": {
"homes": [ "homes": [{
{
"currentSubscription": { "currentSubscription": {
"priceRating": data["data"]["viewer"]["homes"][0][ "priceInfo": extracted_price_info
"currentSubscription" }
]["priceRating"] }]
} }
} }
] }
except (KeyError, IndexError) as ex:
LOGGER.error("Error extracting price data: %s", ex)
return {
"data": {
"viewer": {
"homes": [{
"currentSubscription": {
"priceInfo": {
"today": [],
"tomorrow": [],
"yesterday": [],
}
}
}]
} }
} }
} }
def _merge_cached_data(self) -> dict: @callback
"""Merge cached price and rating data.""" def _get_latest_timestamp_from_rating_type(
if not self._cached_price_data or not self._cached_rating_data: self, rating_data: TibberPricesData | None, rating_type: str
return {} ) -> datetime | None:
"""Get the latest timestamp from a specific rating type."""
if not rating_data or "data" not in rating_data:
return None
subscription = { try:
"priceInfo": self._cached_price_data["data"]["viewer"]["homes"][0][ subscription = rating_data["data"]["viewer"]["homes"][0]["currentSubscription"]
"currentSubscription" price_rating = subscription["priceRating"]
]["priceInfo"], latest_timestamp = None
"priceRating": self._cached_rating_data["data"]["viewer"]["homes"][0][
"currentSubscription"
]["priceRating"],
}
return {"data": {"viewer": {"homes": [{"currentSubscription": subscription}]}}} if rating_entries := price_rating.get(rating_type, {}).get("entries", []):
for entry in rating_entries:
if time := entry.get("time"):
timestamp = dt_util.parse_datetime(time)
if timestamp and (not latest_timestamp or timestamp > latest_timestamp):
latest_timestamp = timestamp
async def _get_rating_data(self) -> dict: return latest_timestamp
"""Get fresh rating data from API."""
except (KeyError, IndexError, TypeError):
return None
async def _get_rating_data_for_type(self, rating_type: str) -> dict:
"""Get fresh rating data for a specific type."""
client = self.config_entry.runtime_data.client client = self.config_entry.runtime_data.client
daily = await client.async_get_daily_price_rating()
hourly = await client.async_get_hourly_price_rating()
monthly = await client.async_get_monthly_price_rating()
rating_base = daily["viewer"]["homes"][0]["currentSubscription"]["priceRating"] if rating_type == "hourly":
data = await client.async_get_hourly_price_rating()
elif rating_type == "daily":
data = await client.async_get_daily_price_rating()
else: # monthly
data = await client.async_get_monthly_price_rating()
try:
# Try to access data in the transformed structure first
rating = data["viewer"]["homes"][0]["currentSubscription"]["priceRating"]
except KeyError:
try:
# If that fails, try the raw data structure
rating = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceRating"]
except KeyError as ex:
LOGGER.error("Failed to extract rating data: %s", ex)
raise TibberPricesApiClientError(
TibberPricesApiClientError.EMPTY_DATA_ERROR.format(
query_type=rating_type
)
) from ex
return { return {
"data": { "data": {
@ -264,16 +763,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator):
{ {
"currentSubscription": { "currentSubscription": {
"priceRating": { "priceRating": {
"thresholdPercentages": rating_base[ "thresholdPercentages": rating["thresholdPercentages"],
"thresholdPercentages" rating_type: rating[rating_type],
],
"daily": rating_base["daily"],
"hourly": hourly["viewer"]["homes"][0][
"currentSubscription"
]["priceRating"]["hourly"],
"monthly": monthly["viewer"]["homes"][0][
"currentSubscription"
]["priceRating"]["monthly"],
} }
} }
} }
@ -281,3 +772,55 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator):
} }
} }
} }
@callback
def _merge_all_cached_data(self) -> TibberPricesData:
"""Merge all cached data."""
if not self._cached_price_data:
return cast(TibberPricesData, {})
# Start with price info
subscription = {
"priceInfo": self._cached_price_data["data"]["viewer"]["homes"][0][
"currentSubscription"
]["priceInfo"],
"priceRating": {
"thresholdPercentages": None, # Will be set from any available rating data
}
}
# Add rating data if available
rating_data = {
"hourly": self._cached_rating_data_hourly,
"daily": self._cached_rating_data_daily,
"monthly": self._cached_rating_data_monthly,
}
for rating_type, data in rating_data.items():
if data and "data" in data:
rating = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceRating"]
# Set thresholdPercentages from any available rating data
if not subscription["priceRating"]["thresholdPercentages"]:
subscription["priceRating"]["thresholdPercentages"] = rating["thresholdPercentages"]
# Add the specific rating type data
subscription["priceRating"][rating_type] = rating[rating_type]
return cast(TibberPricesData, {"data": {"viewer": {"homes": [{"currentSubscription": subscription}]}}})
async def async_request_refresh(self) -> None:
"""Request an immediate refresh of the data.
This method will:
1. Set the force update flag to trigger a fresh data fetch
2. Call async_refresh to perform the update
The force update flag will be reset after the update is complete.
"""
self._force_update = True
await self.async_refresh()
def _transform_api_response(self, data: dict[str, Any]) -> TibberPricesData:
"""Transform API response to coordinator data format."""
return cast(TibberPricesData, data)