From adc11b0e4d39bf17ef2d470f867732b19c7e1b44 Mon Sep 17 00:00:00 2001 From: Julian Pawlowski Date: Tue, 20 May 2025 10:41:01 +0000 Subject: [PATCH] refactoring --- custom_components/tibber_prices/api.py | 125 +++-- .../tibber_prices/binary_sensor.py | 10 +- .../tibber_prices/coordinator.py | 459 ++++++------------ custom_components/tibber_prices/sensor.py | 26 +- 4 files changed, 245 insertions(+), 375 deletions(-) diff --git a/custom_components/tibber_prices/api.py b/custom_components/tibber_prices/api.py index 4c79b01..89ade98 100644 --- a/custom_components/tibber_prices/api.py +++ b/custom_components/tibber_prices/api.py @@ -269,6 +269,27 @@ def _transform_price_info(data: dict) -> dict: return data +def _flatten_price_info(subscription: dict) -> dict: + """Extract and flatten priceInfo from subscription.""" + price_info = subscription.get("priceInfo", {}) + return { + "yesterday": price_info.get("yesterday", []), + "today": price_info.get("today", []), + "tomorrow": price_info.get("tomorrow", []), + } + + +def _flatten_price_rating(subscription: dict) -> dict: + """Extract and flatten priceRating from subscription.""" + price_rating = subscription.get("priceRating", {}) + return { + "hourly": price_rating.get("hourly", {}).get("entries", []), + "daily": price_rating.get("daily", {}).get("entries", []), + "monthly": price_rating.get("monthly", {}).get("entries", []), + "thresholdPercentages": price_rating.get("thresholdPercentages"), + } + + class TibberPricesApiClient: """Tibber API Client.""" @@ -314,9 +335,9 @@ class TibberPricesApiClient: query_type=QueryType.VIEWER, ) - async def async_get_price_info(self) -> Any: - """Get price info data including today, tomorrow and last 48 hours.""" - return await self._api_wrapper( + async def async_get_price_info(self) -> dict: + """Get price info data in flat format.""" + response = await self._api_wrapper( data={ "query": """ {viewer{homes{id,currentSubscription{priceInfo{ @@ -329,10 +350,16 @@ class TibberPricesApiClient: }, query_type=QueryType.PRICE_INFO, ) + # response is already transformed, but we want flat + try: + subscription = response["viewer"]["homes"][0]["currentSubscription"] + except KeyError: + subscription = response["data"]["viewer"]["homes"][0]["currentSubscription"] + return {"priceInfo": _flatten_price_info(subscription)} - async def async_get_daily_price_rating(self) -> Any: - """Get daily price rating data.""" - return await self._api_wrapper( + async def async_get_daily_price_rating(self) -> dict: + """Get daily price rating data in flat format.""" + response = await self._api_wrapper( data={ "query": """ {viewer{homes{id,currentSubscription{priceRating{ @@ -345,10 +372,20 @@ class TibberPricesApiClient: }, query_type=QueryType.DAILY_RATING, ) + try: + subscription = response["viewer"]["homes"][0]["currentSubscription"] + except KeyError: + subscription = response["data"]["viewer"]["homes"][0]["currentSubscription"] + return { + "priceRating": { + "daily": _flatten_price_rating(subscription)["daily"], + "thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"], + } + } - async def async_get_hourly_price_rating(self) -> Any: - """Get hourly price rating data.""" - return await self._api_wrapper( + async def async_get_hourly_price_rating(self) -> dict: + """Get hourly price rating data in flat format.""" + response = await self._api_wrapper( data={ "query": """ {viewer{homes{id,currentSubscription{priceRating{ @@ -361,10 +398,20 @@ class TibberPricesApiClient: }, query_type=QueryType.HOURLY_RATING, ) + try: + subscription = response["viewer"]["homes"][0]["currentSubscription"] + except KeyError: + subscription = response["data"]["viewer"]["homes"][0]["currentSubscription"] + return { + "priceRating": { + "hourly": _flatten_price_rating(subscription)["hourly"], + "thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"], + } + } - async def async_get_monthly_price_rating(self) -> Any: - """Get monthly price rating data.""" - return await self._api_wrapper( + async def async_get_monthly_price_rating(self) -> dict: + """Get monthly price rating data in flat format.""" + response = await self._api_wrapper( data={ "query": """ {viewer{homes{id,currentSubscription{priceRating{ @@ -377,45 +424,33 @@ class TibberPricesApiClient: }, query_type=QueryType.MONTHLY_RATING, ) + try: + subscription = response["viewer"]["homes"][0]["currentSubscription"] + except KeyError: + subscription = response["data"]["viewer"]["homes"][0]["currentSubscription"] + return { + "priceRating": { + "monthly": _flatten_price_rating(subscription)["monthly"], + "thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"], + } + } - async def async_get_data(self) -> Any: - """Get all data from the API by combining multiple queries.""" - # Get all data concurrently + async def async_get_data(self) -> dict: + """Get all data from the API by combining multiple queries in flat format.""" price_info = await self.async_get_price_info() daily_rating = await self.async_get_daily_price_rating() hourly_rating = await self.async_get_hourly_price_rating() monthly_rating = await self.async_get_monthly_price_rating() - - # Extract the base paths to make the code more readable - def get_base_path(response: dict) -> dict: - """Get the base subscription path from the response.""" - return response["viewer"]["homes"][0]["currentSubscription"] - - def get_rating_data(response: dict) -> dict: - """Get the price rating data from the response.""" - return get_base_path(response)["priceRating"] - - price_info_data = get_base_path(price_info)["priceInfo"] - - # Combine the results + # Merge all into one flat dict + price_rating = { + "thresholdPercentages": daily_rating["priceRating"].get("thresholdPercentages"), + "daily": daily_rating["priceRating"].get("daily", []), + "hourly": hourly_rating["priceRating"].get("hourly", []), + "monthly": monthly_rating["priceRating"].get("monthly", []), + } return { - "data": { - "viewer": { - "homes": [ - { - "currentSubscription": { - "priceInfo": price_info_data, - "priceRating": { - "thresholdPercentages": get_rating_data(daily_rating)["thresholdPercentages"], - "daily": get_rating_data(daily_rating)["daily"], - "hourly": get_rating_data(hourly_rating)["hourly"], - "monthly": get_rating_data(monthly_rating)["monthly"], - }, - } - } - ] - } - } + "priceInfo": price_info["priceInfo"], + "priceRating": price_rating, } async def async_set_title(self, value: str) -> Any: diff --git a/custom_components/tibber_prices/binary_sensor.py b/custom_components/tibber_prices/binary_sensor.py index b9fa791..2bc2839 100644 --- a/custom_components/tibber_prices/binary_sensor.py +++ b/custom_components/tibber_prices/binary_sensor.py @@ -150,7 +150,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): """Return True if tomorrow's data is fully available, False if not, None if unknown.""" if not self.coordinator.data: return None - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] tomorrow_prices = price_info.get("tomorrow", []) interval_count = len(tomorrow_prices) if interval_count in TOMORROW_INTERVAL_COUNTS: @@ -163,7 +163,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): """Return attributes for tomorrow_data_available binary sensor.""" if not self.coordinator.data: return None - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] tomorrow_prices = price_info.get("tomorrow", []) interval_count = len(tomorrow_prices) if interval_count == 0: @@ -195,7 +195,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): if not self.coordinator.data: return None - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] today_prices = price_info.get("today", []) if not today_prices: @@ -488,7 +488,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): """Get price interval attributes with support for 15-minute intervals and period grouping.""" if not self.coordinator.data: return None - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) @@ -530,7 +530,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): if not self.coordinator.data: return None - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] today_prices = price_info.get("today", []) if not today_prices: diff --git a/custom_components/tibber_prices/coordinator.py b/custom_components/tibber_prices/coordinator.py index 1bcb732..3a5b220 100644 --- a/custom_components/tibber_prices/coordinator.py +++ b/custom_components/tibber_prices/coordinator.py @@ -5,7 +5,7 @@ from __future__ import annotations import asyncio import logging from datetime import datetime, timedelta -from typing import TYPE_CHECKING, Any, Final, TypedDict, cast +from typing import TYPE_CHECKING, Any, Final, cast import homeassistant.util.dt as dt_util from homeassistant.core import HomeAssistant, callback @@ -22,8 +22,6 @@ from .api import ( from .const import DOMAIN, LOGGER if TYPE_CHECKING: - from collections.abc import Callable - from .data import TibberPricesConfigEntry _LOGGER = logging.getLogger(__name__) @@ -38,36 +36,6 @@ UPDATE_FAILED_MSG: Final = "Update failed" AUTH_FAILED_MSG: Final = "Authentication failed" -class TibberPricesPriceInfo(TypedDict): - """Type for price info data structure.""" - - today: list[dict[str, Any]] - tomorrow: list[dict[str, Any]] - yesterday: list[dict[str, Any]] - - -class TibberPricesPriceRating(TypedDict): - """Type for price rating data structure.""" - - thresholdPercentages: dict[str, float] | None - hourly: dict[str, Any] | None - daily: dict[str, Any] | None - monthly: dict[str, Any] | None - - -class TibberPricesSubscriptionData(TypedDict): - """Type for price info data structure.""" - - priceInfo: TibberPricesPriceInfo - priceRating: TibberPricesPriceRating - - -class TibberPricesData(TypedDict): - """Type for Tibber API response data structure.""" - - data: dict[str, dict[str, list[dict[str, TibberPricesSubscriptionData]]]] - - @callback def _raise_no_data() -> None: """Raise error when no data is available.""" @@ -76,15 +44,14 @@ def _raise_no_data() -> None: @callback def _get_latest_timestamp_from_prices( - price_data: TibberPricesData | None, + price_data: dict | None, ) -> datetime | None: """Get the latest timestamp from price data.""" - if not price_data or "data" not in price_data: + if not price_data or "priceInfo" not in price_data: return None try: - subscription = price_data["data"]["viewer"]["homes"][0]["currentSubscription"] - price_info = subscription["priceInfo"] + price_info = price_data["priceInfo"] latest_timestamp = None # Check today's prices @@ -111,20 +78,19 @@ def _get_latest_timestamp_from_prices( @callback def _get_latest_timestamp_from_rating( - rating_data: TibberPricesData | None, + rating_data: dict | None, ) -> datetime | None: """Get the latest timestamp from rating data.""" - if not rating_data or "data" not in rating_data: + if not rating_data or "priceRating" not in rating_data: return None try: - subscription = rating_data["data"]["viewer"]["homes"][0]["currentSubscription"] - price_rating = subscription["priceRating"] + price_rating = rating_data["priceRating"] latest_timestamp = None # Check all rating types (hourly, daily, monthly) for rating_type in ["hourly", "daily", "monthly"]: - if rating_entries := price_rating.get(rating_type, {}).get("entries", []): + if rating_entries := price_rating.get(rating_type, []): for entry in rating_entries: if time := entry.get("time"): timestamp = dt_util.parse_datetime(time) @@ -137,7 +103,7 @@ def _get_latest_timestamp_from_rating( # https://developers.home-assistant.io/docs/integration_fetching_data#coordinated-single-api-poll-for-data-for-all-entities -class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]): +class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]): """Class to manage fetching data from the API.""" def __init__( @@ -152,10 +118,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) self.config_entry = entry storage_key = f"{DOMAIN}.{entry.entry_id}" self._store = Store(hass, STORAGE_VERSION, storage_key) - self._cached_price_data: TibberPricesData | None = None - self._cached_rating_data_hourly: TibberPricesData | None = None - self._cached_rating_data_daily: TibberPricesData | None = None - self._cached_rating_data_monthly: TibberPricesData | None = None + self._cached_price_data: dict | None = None + self._cached_rating_data_hourly: dict | None = None + self._cached_rating_data_daily: dict | None = None + self._cached_rating_data_monthly: dict | None = None self._last_price_update: datetime | None = None self._last_rating_update_hourly: datetime | None = None self._last_rating_update_daily: datetime | None = None @@ -191,7 +157,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) # Then do a regular refresh await self.async_refresh() - async def _async_update_data(self) -> TibberPricesData: + async def _async_update_data(self) -> dict: """ Fetch new state data for the coordinator. @@ -265,7 +231,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) else: return result - async def _handle_conditional_update(self, current_time: datetime) -> TibberPricesData: + async def _handle_conditional_update(self, current_time: datetime) -> dict: """Handle conditional update based on update conditions.""" # Simplified conditional update checking update_conditions = self._check_update_conditions(current_time) @@ -309,7 +275,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) ), } - async def _fetch_all_data(self) -> TibberPricesData: + async def _fetch_all_data(self) -> dict: """ Fetch all data from the API without checking update conditions. @@ -355,7 +321,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) _raise_no_data() # Only update cache if we have valid data - self._cached_price_data = cast("TibberPricesData", new_data["price_data"]) + self._cached_price_data = cast("dict", new_data["price_data"]) self._last_price_update = current_time # Update rating data cache only for types that were successfully fetched @@ -371,61 +337,21 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) return self._merge_all_cached_data() @callback - def _update_rating_cache(self, rating_type: str, rating_data: TibberPricesData, current_time: datetime) -> None: + def _update_rating_cache(self, rating_type: str, rating_data: dict, current_time: datetime) -> None: """Update the rating cache for a specific rating type.""" if rating_type == "hourly": - self._cached_rating_data_hourly = cast("TibberPricesData", rating_data) + self._cached_rating_data_hourly = cast("dict", rating_data) self._last_rating_update_hourly = current_time elif rating_type == "daily": - self._cached_rating_data_daily = cast("TibberPricesData", rating_data) + self._cached_rating_data_daily = cast("dict", rating_data) self._last_rating_update_daily = current_time else: # monthly - self._cached_rating_data_monthly = cast("TibberPricesData", rating_data) + self._cached_rating_data_monthly = cast("dict", rating_data) self._last_rating_update_monthly = current_time LOGGER.debug("Updated %s rating data cache at %s", rating_type, current_time) async def _store_cache(self) -> None: - """Store cache data.""" - - def _recover_and_log_timestamp( - data: TibberPricesData | None, - last_update: datetime | None, - get_latest_fn: Callable[[TibberPricesData], datetime | None], - label: str, - ) -> datetime | None: - if data and not last_update: - latest_timestamp = get_latest_fn(data) - if latest_timestamp: - LOGGER.debug("Setting missing %s timestamp to: %s", label, latest_timestamp) - return latest_timestamp - LOGGER.warning("Could not recover %s timestamp from data!", label) - return last_update - - self._last_price_update = _recover_and_log_timestamp( - self._cached_price_data, - self._last_price_update, - _get_latest_timestamp_from_prices, - "price update", - ) - self._last_rating_update_hourly = _recover_and_log_timestamp( - self._cached_rating_data_hourly, - self._last_rating_update_hourly, - lambda d: self._get_latest_timestamp_from_rating_type(d, "hourly"), - "hourly rating", - ) - self._last_rating_update_daily = _recover_and_log_timestamp( - self._cached_rating_data_daily, - self._last_rating_update_daily, - lambda d: self._get_latest_timestamp_from_rating_type(d, "daily"), - "daily rating", - ) - self._last_rating_update_monthly = _recover_and_log_timestamp( - self._cached_rating_data_monthly, - self._last_rating_update_monthly, - lambda d: self._get_latest_timestamp_from_rating_type(d, "monthly"), - "monthly rating", - ) - + """Store cache data in flat format.""" data = { "price_data": self._cached_price_data, "rating_data_hourly": self._cached_rating_data_hourly, @@ -446,7 +372,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) "Storing cache data with timestamps: %s", {k: v for k, v in data.items() if k.startswith("last_")}, ) - # Defensive: warn if any required data is missing before saving if data["price_data"] is None: LOGGER.warning("Attempting to store cache with missing price_data!") if data["last_price_update"] is None: @@ -514,7 +439,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) def _should_update_rating_type( self, current_time: datetime, - cached_data: TibberPricesData | None, + cached_data: dict | None, last_update: datetime | None, rating_type: str, ) -> bool: @@ -589,56 +514,30 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) @callback def _extract_price_data(self, data: dict) -> dict: - """Extract price data for caching.""" + """Extract price data for caching in flat format.""" try: - # Try to access data in the transformed structure first - try: - price_info = data["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] - except KeyError: - # If that fails, try the raw data structure - price_info = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] - - # Ensure we have all required fields + price_info = data["priceInfo"] extracted_price_info = { + "yesterday": price_info.get("yesterday", []), "today": price_info.get("today", []), "tomorrow": price_info.get("tomorrow", []), - "yesterday": price_info.get("yesterday", []), } - except (KeyError, IndexError) as ex: + except (KeyError, IndexError, TypeError) as ex: LOGGER.error("Error extracting price data: %s", ex) - return { - "data": { - "viewer": { - "homes": [ - { - "currentSubscription": { - "priceInfo": { - "today": [], - "tomorrow": [], - "yesterday": [], - } - } - } - ] - } - } - } - return {"data": {"viewer": {"homes": [{"currentSubscription": {"priceInfo": extracted_price_info}}]}}} + extracted_price_info = {"yesterday": [], "today": [], "tomorrow": []} + return extracted_price_info @callback - def _get_latest_timestamp_from_rating_type( - self, rating_data: TibberPricesData | None, rating_type: str - ) -> datetime | None: + def _get_latest_timestamp_from_rating_type(self, rating_data: dict | None, rating_type: str) -> datetime | None: """Get the latest timestamp from a specific rating type.""" - if not rating_data or "data" not in rating_data: + if not rating_data or "priceRating" not in rating_data: return None try: - subscription = rating_data["data"]["viewer"]["homes"][0]["currentSubscription"] - price_rating = subscription["priceRating"] + price_rating = rating_data["priceRating"] result = None - if rating_entries := price_rating.get(rating_type, {}).get("entries", []): + if rating_entries := price_rating.get(rating_type, []): for entry in rating_entries: if time := entry.get("time"): timestamp = dt_util.parse_datetime(time) @@ -649,99 +548,135 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) return result async def _get_rating_data_for_type(self, rating_type: str) -> dict: - """Get fresh rating data for a specific type.""" + """Get fresh rating data for a specific type in flat format.""" client = self.config_entry.runtime_data.client - if rating_type == "hourly": data = await client.async_get_hourly_price_rating() elif rating_type == "daily": data = await client.async_get_daily_price_rating() - else: # monthly + else: data = await client.async_get_monthly_price_rating() + # Accept both {"priceRating": {...}} and flat {rating_type: [...], ...} dicts try: - # Try to access data in the transformed structure first - rating = data["viewer"]["homes"][0]["currentSubscription"]["priceRating"] - except KeyError: - try: - # If that fails, try the raw data structure - rating = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceRating"] - except KeyError as ex: - LOGGER.error("Failed to extract rating data: %s", ex) - raise TibberPricesApiClientError( - TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=rating_type) - ) from ex - else: - return { - "data": { - "viewer": { - "homes": [ - { - "currentSubscription": { - "priceRating": { - "thresholdPercentages": rating["thresholdPercentages"], - rating_type: rating[rating_type], - } - } - } - ] - } - } - } - else: - return { - "data": { - "viewer": { - "homes": [ - { - "currentSubscription": { - "priceRating": { - "thresholdPercentages": rating["thresholdPercentages"], - rating_type: rating[rating_type], - } - } - } - ] - } - } - } + price_rating = data.get("priceRating", data) + threshold = price_rating.get("thresholdPercentages") + entries = price_rating.get(rating_type, []) + except KeyError as ex: + LOGGER.error("Failed to extract rating data (flat format): %s", ex) + raise TibberPricesApiClientError( + TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=rating_type) + ) from ex + return {"priceRating": {rating_type: entries, "thresholdPercentages": threshold}} @callback - def _merge_all_cached_data(self) -> TibberPricesData: - """Merge all cached data.""" + def _merge_all_cached_data(self) -> dict: + """Merge all cached data into flat format.""" if not self._cached_price_data: - return cast("TibberPricesData", {}) + return {} + # Wrap cached price data in 'priceInfo' only if not already wrapped + if "priceInfo" in self._cached_price_data: + merged = {"priceInfo": self._cached_price_data["priceInfo"]} + else: + merged = {"priceInfo": self._cached_price_data} + price_rating = {"hourly": [], "daily": [], "monthly": [], "thresholdPercentages": None} + for rating_type, cached in zip( + ["hourly", "daily", "monthly"], + [self._cached_rating_data_hourly, self._cached_rating_data_daily, self._cached_rating_data_monthly], + strict=True, + ): + if cached and "priceRating" in cached: + entries = cached["priceRating"].get(rating_type, []) + price_rating[rating_type] = entries + if not price_rating["thresholdPercentages"]: + price_rating["thresholdPercentages"] = cached["priceRating"].get("thresholdPercentages") + merged["priceRating"] = price_rating + return merged - # Start with price info - subscription = { - "priceInfo": self._cached_price_data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"], - "priceRating": { - "thresholdPercentages": None, - }, - } + async def _async_initialize(self) -> None: + """Load stored data in flat format.""" + stored = await self._store.async_load() + if stored is None: + LOGGER.warning("No cache file found or cache is empty on startup.") + else: + LOGGER.debug("Loading stored data: %s", stored) + if stored: + self._cached_price_data = stored.get("price_data") + self._cached_rating_data_hourly = stored.get("rating_data_hourly") + self._cached_rating_data_daily = stored.get("rating_data_daily") + self._cached_rating_data_monthly = stored.get("rating_data_monthly") + self._last_price_update = self._recover_timestamp(self._cached_price_data, stored.get("last_price_update")) + self._last_rating_update_hourly = self._recover_timestamp( + self._cached_rating_data_hourly, + stored.get("last_rating_update_hourly"), + "hourly", + ) + self._last_rating_update_daily = self._recover_timestamp( + self._cached_rating_data_daily, + stored.get("last_rating_update_daily"), + "daily", + ) + self._last_rating_update_monthly = self._recover_timestamp( + self._cached_rating_data_monthly, + stored.get("last_rating_update_monthly"), + "monthly", + ) + LOGGER.debug( + "Loaded stored cache data - Price update: %s, Rating hourly: %s, daily: %s, monthly: %s", + self._last_price_update, + self._last_rating_update_hourly, + self._last_rating_update_daily, + self._last_rating_update_monthly, + ) + if self._cached_price_data is None: + LOGGER.warning("Cached price data missing after cache load!") + if self._last_price_update is None: + LOGGER.warning("Price update timestamp missing after cache load!") + else: + LOGGER.info("No cache loaded; will fetch fresh data on first update.") - # Add rating data if available - rating_data = { - "hourly": self._cached_rating_data_hourly, - "daily": self._cached_rating_data_daily, - "monthly": self._cached_rating_data_monthly, - } + def get_all_intervals(self) -> list[dict]: + """Return a combined, sorted list of all price intervals for yesterday, today, and tomorrow.""" + if not self.data or "priceInfo" not in self.data: + return [] + price_info = self.data["priceInfo"] + all_prices = price_info.get("yesterday", []) + price_info.get("today", []) + price_info.get("tomorrow", []) + return sorted(all_prices, key=lambda p: p["startsAt"]) - for rating_type, data in rating_data.items(): - if data and "data" in data: - rating = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceRating"] + def get_interval_granularity(self) -> int | None: + """Return the interval granularity in minutes (e.g., 15 or 60) for today's data.""" + if not self.data or "priceInfo" not in self.data: + return None + today_prices = self.data["priceInfo"].get("today", []) + from .sensor import detect_interval_granularity - # Set thresholdPercentages from any available rating data - if not subscription["priceRating"]["thresholdPercentages"]: - subscription["priceRating"]["thresholdPercentages"] = rating["thresholdPercentages"] + return detect_interval_granularity(today_prices) if today_prices else None - # Add the specific rating type data - subscription["priceRating"][rating_type] = rating[rating_type] + def get_current_interval_data(self) -> dict | None: + """Return the price data for the current interval.""" + if not self.data or "priceInfo" not in self.data: + return None + price_info = self.data["priceInfo"] + now = dt_util.now() + interval_length = self.get_interval_granularity() + from .sensor import find_price_data_for_interval - return cast( - "TibberPricesData", - {"data": {"viewer": {"homes": [{"currentSubscription": subscription}]}}}, - ) + return find_price_data_for_interval(price_info, now, interval_length) + + def get_combined_price_info(self) -> dict: + """Return a dict with all intervals under a single key 'all'.""" + return {"all": self.get_all_intervals()} + + def is_tomorrow_data_available(self) -> bool | None: + """Return True if tomorrow's data is fully available, False if not, None if unknown.""" + if not self.data or "priceInfo" not in self.data: + return None + tomorrow_prices = self.data["priceInfo"].get("tomorrow", []) + interval_count = len(tomorrow_prices) + min_tomorrow_intervals_hourly = 24 + min_tomorrow_intervals_15min = 96 + tomorrow_interval_counts = {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min} + return interval_count in tomorrow_interval_counts async def async_request_refresh(self) -> None: """ @@ -756,9 +691,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) self._force_update = True await self.async_refresh() - def _transform_api_response(self, data: dict[str, Any]) -> TibberPricesData: + def _transform_api_response(self, data: dict[str, Any]) -> dict: """Transform API response to coordinator data format.""" - return cast("TibberPricesData", data) + return cast("dict", data) async def _perform_midnight_rotation(self) -> None: """ @@ -775,8 +710,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) async with self._rotation_lock: try: - subscription = self._cached_price_data["data"]["viewer"]["homes"][0]["currentSubscription"] - price_info = subscription["priceInfo"] + price_info = self._cached_price_data["priceInfo"] # Save current data state for logging today_count = len(price_info.get("today", [])) @@ -823,105 +757,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]) except (KeyError, TypeError, ValueError) as ex: LOGGER.error("Error during midnight data rotation in hourly update: %s", ex) - def get_all_intervals(self) -> list[dict]: - """Return a combined, sorted list of all price intervals for yesterday, today, and tomorrow.""" - if not self.data: - return [] - price_info = self.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] - all_prices = price_info.get("yesterday", []) + price_info.get("today", []) + price_info.get("tomorrow", []) - return sorted(all_prices, key=lambda p: p["startsAt"]) - - def get_interval_granularity(self) -> int | None: - """Return the interval granularity in minutes (e.g., 15 or 60) for today's data.""" - if not self.data: - return None - price_info = self.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] - today_prices = price_info.get("today", []) - from .sensor import detect_interval_granularity - - return detect_interval_granularity(today_prices) if today_prices else None - - def get_current_interval_data(self) -> dict | None: - """Return the price data for the current interval.""" - if not self.data: - return None - price_info = self.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] - now = dt_util.now() - interval_length = self.get_interval_granularity() - from .sensor import find_price_data_for_interval - - return find_price_data_for_interval(price_info, now, interval_length) - - def get_combined_price_info(self) -> dict: - """Return a dict with all intervals under a single key 'all'.""" - return {"all": self.get_all_intervals()} - - def is_tomorrow_data_available(self) -> bool | None: - """Return True if tomorrow's data is fully available, False if not, None if unknown.""" - if not self.data: - return None - price_info = self.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] - tomorrow_prices = price_info.get("tomorrow", []) - interval_count = len(tomorrow_prices) - # Use the same logic as in binary_sensor.py - min_tomorrow_intervals_hourly = 24 - min_tomorrow_intervals_15min = 96 - tomorrow_interval_counts = {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min} - return interval_count in tomorrow_interval_counts - - async def _async_initialize(self) -> None: - """Load stored data.""" - stored = await self._store.async_load() - if stored is None: - LOGGER.warning("No cache file found or cache is empty on startup.") - else: - LOGGER.debug("Loading stored data: %s", stored) - - if stored: - # Load cached data - self._cached_price_data = cast("TibberPricesData", stored.get("price_data")) - self._cached_rating_data_hourly = cast("TibberPricesData", stored.get("rating_data_hourly")) - self._cached_rating_data_daily = cast("TibberPricesData", stored.get("rating_data_daily")) - self._cached_rating_data_monthly = cast("TibberPricesData", stored.get("rating_data_monthly")) - - # Recover timestamps - self._last_price_update = self._recover_timestamp(self._cached_price_data, stored.get("last_price_update")) - self._last_rating_update_hourly = self._recover_timestamp( - self._cached_rating_data_hourly, - stored.get("last_rating_update_hourly"), - "hourly", - ) - self._last_rating_update_daily = self._recover_timestamp( - self._cached_rating_data_daily, - stored.get("last_rating_update_daily"), - "daily", - ) - self._last_rating_update_monthly = self._recover_timestamp( - self._cached_rating_data_monthly, - stored.get("last_rating_update_monthly"), - "monthly", - ) - - LOGGER.debug( - "Loaded stored cache data - Price update: %s, Rating hourly: %s, daily: %s, monthly: %s", - self._last_price_update, - self._last_rating_update_hourly, - self._last_rating_update_daily, - self._last_rating_update_monthly, - ) - - # Defensive: warn if any required data is missing - if self._cached_price_data is None: - LOGGER.warning("Cached price data missing after cache load!") - if self._last_price_update is None: - LOGGER.warning("Price update timestamp missing after cache load!") - else: - LOGGER.info("No cache loaded; will fetch fresh data on first update.") - @callback def _recover_timestamp( self, - data: TibberPricesData | None, + data: dict | None, stored_timestamp: str | None, rating_type: str | None = None, ) -> datetime | None: diff --git a/custom_components/tibber_prices/sensor.py b/custom_components/tibber_prices/sensor.py index e1975b5..ff1f896 100644 --- a/custom_components/tibber_prices/sensor.py +++ b/custom_components/tibber_prices/sensor.py @@ -310,8 +310,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): """Get price for current hour or with offset.""" if not self.coordinator.data: return None - - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] # Use HomeAssistant's dt_util to get the current time in the user's timezone now = dt_util.now() @@ -408,7 +407,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): if not self.coordinator.data: return None - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] today_prices = price_info.get("today", []) if not today_prices: return None @@ -509,12 +508,11 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): self._last_rating_difference = None self._last_rating_level = None return None - subscription = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"] - price_rating = subscription.get("priceRating", {}) or {} + price_rating = self.coordinator.data.get("priceRating", {}) now = dt_util.now() - rating_data = price_rating.get(rating_type, {}) - entries = rating_data.get("entries", []) if rating_data else [] - entry = self._find_rating_entry(entries, now, rating_type, dict(subscription)) + # In the new flat format, price_rating[rating_type] is a list of entries + entries = price_rating.get(rating_type, []) + entry = self._find_rating_entry(entries, now, rating_type, dict(self.coordinator.data)) if entry: difference = entry.get("difference") level = entry.get("level") @@ -530,7 +528,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): if not self.coordinator.data: return None - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] latest_timestamp = None for day in ["today", "tomorrow"]: @@ -565,10 +563,8 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): if not self.coordinator.data: return None - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] - price_rating = ( - self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"].get("priceRating", {}) or {} - ) + price_info = self.coordinator.data["priceInfo"] + price_rating = self.coordinator.data.get("priceRating", {}) # Determine data granularity from the current price data today_prices = price_info.get("today", []) @@ -878,7 +874,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): # Add timestamp for next interval price sensors if self.entity_description.key in ["next_interval_price", "next_interval_price_eur"]: # Get the next interval's data - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] today_prices = price_info.get("today", []) data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL now = dt_util.now() @@ -916,7 +912,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): def _add_statistics_attributes(self, attributes: dict) -> None: """Add attributes for statistics, rating, and diagnostic sensors.""" key = self.entity_description.key - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + price_info = self.coordinator.data["priceInfo"] now = dt_util.now() if key == "price_rating": today_prices = price_info.get("today", [])