refactoring

This commit is contained in:
Julian Pawlowski 2025-05-20 10:41:01 +00:00
parent da4ef5675a
commit adc11b0e4d
4 changed files with 245 additions and 375 deletions

View file

@ -269,6 +269,27 @@ def _transform_price_info(data: dict) -> dict:
return data
def _flatten_price_info(subscription: dict) -> dict:
"""Extract and flatten priceInfo from subscription."""
price_info = subscription.get("priceInfo", {})
return {
"yesterday": price_info.get("yesterday", []),
"today": price_info.get("today", []),
"tomorrow": price_info.get("tomorrow", []),
}
def _flatten_price_rating(subscription: dict) -> dict:
"""Extract and flatten priceRating from subscription."""
price_rating = subscription.get("priceRating", {})
return {
"hourly": price_rating.get("hourly", {}).get("entries", []),
"daily": price_rating.get("daily", {}).get("entries", []),
"monthly": price_rating.get("monthly", {}).get("entries", []),
"thresholdPercentages": price_rating.get("thresholdPercentages"),
}
class TibberPricesApiClient:
"""Tibber API Client."""
@ -314,9 +335,9 @@ class TibberPricesApiClient:
query_type=QueryType.VIEWER,
)
async def async_get_price_info(self) -> Any:
"""Get price info data including today, tomorrow and last 48 hours."""
return await self._api_wrapper(
async def async_get_price_info(self) -> dict:
"""Get price info data in flat format."""
response = await self._api_wrapper(
data={
"query": """
{viewer{homes{id,currentSubscription{priceInfo{
@ -329,10 +350,16 @@ class TibberPricesApiClient:
},
query_type=QueryType.PRICE_INFO,
)
# response is already transformed, but we want flat
try:
subscription = response["viewer"]["homes"][0]["currentSubscription"]
except KeyError:
subscription = response["data"]["viewer"]["homes"][0]["currentSubscription"]
return {"priceInfo": _flatten_price_info(subscription)}
async def async_get_daily_price_rating(self) -> Any:
"""Get daily price rating data."""
return await self._api_wrapper(
async def async_get_daily_price_rating(self) -> dict:
"""Get daily price rating data in flat format."""
response = await self._api_wrapper(
data={
"query": """
{viewer{homes{id,currentSubscription{priceRating{
@ -345,10 +372,20 @@ class TibberPricesApiClient:
},
query_type=QueryType.DAILY_RATING,
)
try:
subscription = response["viewer"]["homes"][0]["currentSubscription"]
except KeyError:
subscription = response["data"]["viewer"]["homes"][0]["currentSubscription"]
return {
"priceRating": {
"daily": _flatten_price_rating(subscription)["daily"],
"thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"],
}
}
async def async_get_hourly_price_rating(self) -> Any:
"""Get hourly price rating data."""
return await self._api_wrapper(
async def async_get_hourly_price_rating(self) -> dict:
"""Get hourly price rating data in flat format."""
response = await self._api_wrapper(
data={
"query": """
{viewer{homes{id,currentSubscription{priceRating{
@ -361,10 +398,20 @@ class TibberPricesApiClient:
},
query_type=QueryType.HOURLY_RATING,
)
try:
subscription = response["viewer"]["homes"][0]["currentSubscription"]
except KeyError:
subscription = response["data"]["viewer"]["homes"][0]["currentSubscription"]
return {
"priceRating": {
"hourly": _flatten_price_rating(subscription)["hourly"],
"thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"],
}
}
async def async_get_monthly_price_rating(self) -> Any:
"""Get monthly price rating data."""
return await self._api_wrapper(
async def async_get_monthly_price_rating(self) -> dict:
"""Get monthly price rating data in flat format."""
response = await self._api_wrapper(
data={
"query": """
{viewer{homes{id,currentSubscription{priceRating{
@ -377,45 +424,33 @@ class TibberPricesApiClient:
},
query_type=QueryType.MONTHLY_RATING,
)
try:
subscription = response["viewer"]["homes"][0]["currentSubscription"]
except KeyError:
subscription = response["data"]["viewer"]["homes"][0]["currentSubscription"]
return {
"priceRating": {
"monthly": _flatten_price_rating(subscription)["monthly"],
"thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"],
}
}
async def async_get_data(self) -> Any:
"""Get all data from the API by combining multiple queries."""
# Get all data concurrently
async def async_get_data(self) -> dict:
"""Get all data from the API by combining multiple queries in flat format."""
price_info = await self.async_get_price_info()
daily_rating = await self.async_get_daily_price_rating()
hourly_rating = await self.async_get_hourly_price_rating()
monthly_rating = await self.async_get_monthly_price_rating()
# Extract the base paths to make the code more readable
def get_base_path(response: dict) -> dict:
"""Get the base subscription path from the response."""
return response["viewer"]["homes"][0]["currentSubscription"]
def get_rating_data(response: dict) -> dict:
"""Get the price rating data from the response."""
return get_base_path(response)["priceRating"]
price_info_data = get_base_path(price_info)["priceInfo"]
# Combine the results
# Merge all into one flat dict
price_rating = {
"thresholdPercentages": daily_rating["priceRating"].get("thresholdPercentages"),
"daily": daily_rating["priceRating"].get("daily", []),
"hourly": hourly_rating["priceRating"].get("hourly", []),
"monthly": monthly_rating["priceRating"].get("monthly", []),
}
return {
"data": {
"viewer": {
"homes": [
{
"currentSubscription": {
"priceInfo": price_info_data,
"priceRating": {
"thresholdPercentages": get_rating_data(daily_rating)["thresholdPercentages"],
"daily": get_rating_data(daily_rating)["daily"],
"hourly": get_rating_data(hourly_rating)["hourly"],
"monthly": get_rating_data(monthly_rating)["monthly"],
},
}
}
]
}
}
"priceInfo": price_info["priceInfo"],
"priceRating": price_rating,
}
async def async_set_title(self, value: str) -> Any:

View file

@ -150,7 +150,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count in TOMORROW_INTERVAL_COUNTS:
@ -163,7 +163,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"""Return attributes for tomorrow_data_available binary sensor."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count == 0:
@ -195,7 +195,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
today_prices = price_info.get("today", [])
if not today_prices:
@ -488,7 +488,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"""Get price interval attributes with support for 15-minute intervals and period grouping."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
@ -530,7 +530,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
today_prices = price_info.get("today", [])
if not today_prices:

View file

@ -5,7 +5,7 @@ from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Final, TypedDict, cast
from typing import TYPE_CHECKING, Any, Final, cast
import homeassistant.util.dt as dt_util
from homeassistant.core import HomeAssistant, callback
@ -22,8 +22,6 @@ from .api import (
from .const import DOMAIN, LOGGER
if TYPE_CHECKING:
from collections.abc import Callable
from .data import TibberPricesConfigEntry
_LOGGER = logging.getLogger(__name__)
@ -38,36 +36,6 @@ UPDATE_FAILED_MSG: Final = "Update failed"
AUTH_FAILED_MSG: Final = "Authentication failed"
class TibberPricesPriceInfo(TypedDict):
"""Type for price info data structure."""
today: list[dict[str, Any]]
tomorrow: list[dict[str, Any]]
yesterday: list[dict[str, Any]]
class TibberPricesPriceRating(TypedDict):
"""Type for price rating data structure."""
thresholdPercentages: dict[str, float] | None
hourly: dict[str, Any] | None
daily: dict[str, Any] | None
monthly: dict[str, Any] | None
class TibberPricesSubscriptionData(TypedDict):
"""Type for price info data structure."""
priceInfo: TibberPricesPriceInfo
priceRating: TibberPricesPriceRating
class TibberPricesData(TypedDict):
"""Type for Tibber API response data structure."""
data: dict[str, dict[str, list[dict[str, TibberPricesSubscriptionData]]]]
@callback
def _raise_no_data() -> None:
"""Raise error when no data is available."""
@ -76,15 +44,14 @@ def _raise_no_data() -> None:
@callback
def _get_latest_timestamp_from_prices(
price_data: TibberPricesData | None,
price_data: dict | None,
) -> datetime | None:
"""Get the latest timestamp from price data."""
if not price_data or "data" not in price_data:
if not price_data or "priceInfo" not in price_data:
return None
try:
subscription = price_data["data"]["viewer"]["homes"][0]["currentSubscription"]
price_info = subscription["priceInfo"]
price_info = price_data["priceInfo"]
latest_timestamp = None
# Check today's prices
@ -111,20 +78,19 @@ def _get_latest_timestamp_from_prices(
@callback
def _get_latest_timestamp_from_rating(
rating_data: TibberPricesData | None,
rating_data: dict | None,
) -> datetime | None:
"""Get the latest timestamp from rating data."""
if not rating_data or "data" not in rating_data:
if not rating_data or "priceRating" not in rating_data:
return None
try:
subscription = rating_data["data"]["viewer"]["homes"][0]["currentSubscription"]
price_rating = subscription["priceRating"]
price_rating = rating_data["priceRating"]
latest_timestamp = None
# Check all rating types (hourly, daily, monthly)
for rating_type in ["hourly", "daily", "monthly"]:
if rating_entries := price_rating.get(rating_type, {}).get("entries", []):
if rating_entries := price_rating.get(rating_type, []):
for entry in rating_entries:
if time := entry.get("time"):
timestamp = dt_util.parse_datetime(time)
@ -137,7 +103,7 @@ def _get_latest_timestamp_from_rating(
# https://developers.home-assistant.io/docs/integration_fetching_data#coordinated-single-api-poll-for-data-for-all-entities
class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData]):
class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
"""Class to manage fetching data from the API."""
def __init__(
@ -152,10 +118,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
self.config_entry = entry
storage_key = f"{DOMAIN}.{entry.entry_id}"
self._store = Store(hass, STORAGE_VERSION, storage_key)
self._cached_price_data: TibberPricesData | None = None
self._cached_rating_data_hourly: TibberPricesData | None = None
self._cached_rating_data_daily: TibberPricesData | None = None
self._cached_rating_data_monthly: TibberPricesData | None = None
self._cached_price_data: dict | None = None
self._cached_rating_data_hourly: dict | None = None
self._cached_rating_data_daily: dict | None = None
self._cached_rating_data_monthly: dict | None = None
self._last_price_update: datetime | None = None
self._last_rating_update_hourly: datetime | None = None
self._last_rating_update_daily: datetime | None = None
@ -191,7 +157,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
# Then do a regular refresh
await self.async_refresh()
async def _async_update_data(self) -> TibberPricesData:
async def _async_update_data(self) -> dict:
"""
Fetch new state data for the coordinator.
@ -265,7 +231,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
else:
return result
async def _handle_conditional_update(self, current_time: datetime) -> TibberPricesData:
async def _handle_conditional_update(self, current_time: datetime) -> dict:
"""Handle conditional update based on update conditions."""
# Simplified conditional update checking
update_conditions = self._check_update_conditions(current_time)
@ -309,7 +275,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
),
}
async def _fetch_all_data(self) -> TibberPricesData:
async def _fetch_all_data(self) -> dict:
"""
Fetch all data from the API without checking update conditions.
@ -355,7 +321,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
_raise_no_data()
# Only update cache if we have valid data
self._cached_price_data = cast("TibberPricesData", new_data["price_data"])
self._cached_price_data = cast("dict", new_data["price_data"])
self._last_price_update = current_time
# Update rating data cache only for types that were successfully fetched
@ -371,61 +337,21 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
return self._merge_all_cached_data()
@callback
def _update_rating_cache(self, rating_type: str, rating_data: TibberPricesData, current_time: datetime) -> None:
def _update_rating_cache(self, rating_type: str, rating_data: dict, current_time: datetime) -> None:
"""Update the rating cache for a specific rating type."""
if rating_type == "hourly":
self._cached_rating_data_hourly = cast("TibberPricesData", rating_data)
self._cached_rating_data_hourly = cast("dict", rating_data)
self._last_rating_update_hourly = current_time
elif rating_type == "daily":
self._cached_rating_data_daily = cast("TibberPricesData", rating_data)
self._cached_rating_data_daily = cast("dict", rating_data)
self._last_rating_update_daily = current_time
else: # monthly
self._cached_rating_data_monthly = cast("TibberPricesData", rating_data)
self._cached_rating_data_monthly = cast("dict", rating_data)
self._last_rating_update_monthly = current_time
LOGGER.debug("Updated %s rating data cache at %s", rating_type, current_time)
async def _store_cache(self) -> None:
"""Store cache data."""
def _recover_and_log_timestamp(
data: TibberPricesData | None,
last_update: datetime | None,
get_latest_fn: Callable[[TibberPricesData], datetime | None],
label: str,
) -> datetime | None:
if data and not last_update:
latest_timestamp = get_latest_fn(data)
if latest_timestamp:
LOGGER.debug("Setting missing %s timestamp to: %s", label, latest_timestamp)
return latest_timestamp
LOGGER.warning("Could not recover %s timestamp from data!", label)
return last_update
self._last_price_update = _recover_and_log_timestamp(
self._cached_price_data,
self._last_price_update,
_get_latest_timestamp_from_prices,
"price update",
)
self._last_rating_update_hourly = _recover_and_log_timestamp(
self._cached_rating_data_hourly,
self._last_rating_update_hourly,
lambda d: self._get_latest_timestamp_from_rating_type(d, "hourly"),
"hourly rating",
)
self._last_rating_update_daily = _recover_and_log_timestamp(
self._cached_rating_data_daily,
self._last_rating_update_daily,
lambda d: self._get_latest_timestamp_from_rating_type(d, "daily"),
"daily rating",
)
self._last_rating_update_monthly = _recover_and_log_timestamp(
self._cached_rating_data_monthly,
self._last_rating_update_monthly,
lambda d: self._get_latest_timestamp_from_rating_type(d, "monthly"),
"monthly rating",
)
"""Store cache data in flat format."""
data = {
"price_data": self._cached_price_data,
"rating_data_hourly": self._cached_rating_data_hourly,
@ -446,7 +372,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
"Storing cache data with timestamps: %s",
{k: v for k, v in data.items() if k.startswith("last_")},
)
# Defensive: warn if any required data is missing before saving
if data["price_data"] is None:
LOGGER.warning("Attempting to store cache with missing price_data!")
if data["last_price_update"] is None:
@ -514,7 +439,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
def _should_update_rating_type(
self,
current_time: datetime,
cached_data: TibberPricesData | None,
cached_data: dict | None,
last_update: datetime | None,
rating_type: str,
) -> bool:
@ -589,56 +514,30 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
@callback
def _extract_price_data(self, data: dict) -> dict:
"""Extract price data for caching."""
"""Extract price data for caching in flat format."""
try:
# Try to access data in the transformed structure first
try:
price_info = data["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
except KeyError:
# If that fails, try the raw data structure
price_info = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
# Ensure we have all required fields
price_info = data["priceInfo"]
extracted_price_info = {
"yesterday": price_info.get("yesterday", []),
"today": price_info.get("today", []),
"tomorrow": price_info.get("tomorrow", []),
"yesterday": price_info.get("yesterday", []),
}
except (KeyError, IndexError) as ex:
except (KeyError, IndexError, TypeError) as ex:
LOGGER.error("Error extracting price data: %s", ex)
return {
"data": {
"viewer": {
"homes": [
{
"currentSubscription": {
"priceInfo": {
"today": [],
"tomorrow": [],
"yesterday": [],
}
}
}
]
}
}
}
return {"data": {"viewer": {"homes": [{"currentSubscription": {"priceInfo": extracted_price_info}}]}}}
extracted_price_info = {"yesterday": [], "today": [], "tomorrow": []}
return extracted_price_info
@callback
def _get_latest_timestamp_from_rating_type(
self, rating_data: TibberPricesData | None, rating_type: str
) -> datetime | None:
def _get_latest_timestamp_from_rating_type(self, rating_data: dict | None, rating_type: str) -> datetime | None:
"""Get the latest timestamp from a specific rating type."""
if not rating_data or "data" not in rating_data:
if not rating_data or "priceRating" not in rating_data:
return None
try:
subscription = rating_data["data"]["viewer"]["homes"][0]["currentSubscription"]
price_rating = subscription["priceRating"]
price_rating = rating_data["priceRating"]
result = None
if rating_entries := price_rating.get(rating_type, {}).get("entries", []):
if rating_entries := price_rating.get(rating_type, []):
for entry in rating_entries:
if time := entry.get("time"):
timestamp = dt_util.parse_datetime(time)
@ -649,99 +548,135 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
return result
async def _get_rating_data_for_type(self, rating_type: str) -> dict:
"""Get fresh rating data for a specific type."""
"""Get fresh rating data for a specific type in flat format."""
client = self.config_entry.runtime_data.client
if rating_type == "hourly":
data = await client.async_get_hourly_price_rating()
elif rating_type == "daily":
data = await client.async_get_daily_price_rating()
else: # monthly
else:
data = await client.async_get_monthly_price_rating()
# Accept both {"priceRating": {...}} and flat {rating_type: [...], ...} dicts
try:
# Try to access data in the transformed structure first
rating = data["viewer"]["homes"][0]["currentSubscription"]["priceRating"]
except KeyError:
try:
# If that fails, try the raw data structure
rating = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceRating"]
except KeyError as ex:
LOGGER.error("Failed to extract rating data: %s", ex)
raise TibberPricesApiClientError(
TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=rating_type)
) from ex
else:
return {
"data": {
"viewer": {
"homes": [
{
"currentSubscription": {
"priceRating": {
"thresholdPercentages": rating["thresholdPercentages"],
rating_type: rating[rating_type],
}
}
}
]
}
}
}
else:
return {
"data": {
"viewer": {
"homes": [
{
"currentSubscription": {
"priceRating": {
"thresholdPercentages": rating["thresholdPercentages"],
rating_type: rating[rating_type],
}
}
}
]
}
}
}
price_rating = data.get("priceRating", data)
threshold = price_rating.get("thresholdPercentages")
entries = price_rating.get(rating_type, [])
except KeyError as ex:
LOGGER.error("Failed to extract rating data (flat format): %s", ex)
raise TibberPricesApiClientError(
TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=rating_type)
) from ex
return {"priceRating": {rating_type: entries, "thresholdPercentages": threshold}}
@callback
def _merge_all_cached_data(self) -> TibberPricesData:
"""Merge all cached data."""
def _merge_all_cached_data(self) -> dict:
"""Merge all cached data into flat format."""
if not self._cached_price_data:
return cast("TibberPricesData", {})
return {}
# Wrap cached price data in 'priceInfo' only if not already wrapped
if "priceInfo" in self._cached_price_data:
merged = {"priceInfo": self._cached_price_data["priceInfo"]}
else:
merged = {"priceInfo": self._cached_price_data}
price_rating = {"hourly": [], "daily": [], "monthly": [], "thresholdPercentages": None}
for rating_type, cached in zip(
["hourly", "daily", "monthly"],
[self._cached_rating_data_hourly, self._cached_rating_data_daily, self._cached_rating_data_monthly],
strict=True,
):
if cached and "priceRating" in cached:
entries = cached["priceRating"].get(rating_type, [])
price_rating[rating_type] = entries
if not price_rating["thresholdPercentages"]:
price_rating["thresholdPercentages"] = cached["priceRating"].get("thresholdPercentages")
merged["priceRating"] = price_rating
return merged
# Start with price info
subscription = {
"priceInfo": self._cached_price_data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"],
"priceRating": {
"thresholdPercentages": None,
},
}
async def _async_initialize(self) -> None:
"""Load stored data in flat format."""
stored = await self._store.async_load()
if stored is None:
LOGGER.warning("No cache file found or cache is empty on startup.")
else:
LOGGER.debug("Loading stored data: %s", stored)
if stored:
self._cached_price_data = stored.get("price_data")
self._cached_rating_data_hourly = stored.get("rating_data_hourly")
self._cached_rating_data_daily = stored.get("rating_data_daily")
self._cached_rating_data_monthly = stored.get("rating_data_monthly")
self._last_price_update = self._recover_timestamp(self._cached_price_data, stored.get("last_price_update"))
self._last_rating_update_hourly = self._recover_timestamp(
self._cached_rating_data_hourly,
stored.get("last_rating_update_hourly"),
"hourly",
)
self._last_rating_update_daily = self._recover_timestamp(
self._cached_rating_data_daily,
stored.get("last_rating_update_daily"),
"daily",
)
self._last_rating_update_monthly = self._recover_timestamp(
self._cached_rating_data_monthly,
stored.get("last_rating_update_monthly"),
"monthly",
)
LOGGER.debug(
"Loaded stored cache data - Price update: %s, Rating hourly: %s, daily: %s, monthly: %s",
self._last_price_update,
self._last_rating_update_hourly,
self._last_rating_update_daily,
self._last_rating_update_monthly,
)
if self._cached_price_data is None:
LOGGER.warning("Cached price data missing after cache load!")
if self._last_price_update is None:
LOGGER.warning("Price update timestamp missing after cache load!")
else:
LOGGER.info("No cache loaded; will fetch fresh data on first update.")
# Add rating data if available
rating_data = {
"hourly": self._cached_rating_data_hourly,
"daily": self._cached_rating_data_daily,
"monthly": self._cached_rating_data_monthly,
}
def get_all_intervals(self) -> list[dict]:
"""Return a combined, sorted list of all price intervals for yesterday, today, and tomorrow."""
if not self.data or "priceInfo" not in self.data:
return []
price_info = self.data["priceInfo"]
all_prices = price_info.get("yesterday", []) + price_info.get("today", []) + price_info.get("tomorrow", [])
return sorted(all_prices, key=lambda p: p["startsAt"])
for rating_type, data in rating_data.items():
if data and "data" in data:
rating = data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceRating"]
def get_interval_granularity(self) -> int | None:
"""Return the interval granularity in minutes (e.g., 15 or 60) for today's data."""
if not self.data or "priceInfo" not in self.data:
return None
today_prices = self.data["priceInfo"].get("today", [])
from .sensor import detect_interval_granularity
# Set thresholdPercentages from any available rating data
if not subscription["priceRating"]["thresholdPercentages"]:
subscription["priceRating"]["thresholdPercentages"] = rating["thresholdPercentages"]
return detect_interval_granularity(today_prices) if today_prices else None
# Add the specific rating type data
subscription["priceRating"][rating_type] = rating[rating_type]
def get_current_interval_data(self) -> dict | None:
"""Return the price data for the current interval."""
if not self.data or "priceInfo" not in self.data:
return None
price_info = self.data["priceInfo"]
now = dt_util.now()
interval_length = self.get_interval_granularity()
from .sensor import find_price_data_for_interval
return cast(
"TibberPricesData",
{"data": {"viewer": {"homes": [{"currentSubscription": subscription}]}}},
)
return find_price_data_for_interval(price_info, now, interval_length)
def get_combined_price_info(self) -> dict:
"""Return a dict with all intervals under a single key 'all'."""
return {"all": self.get_all_intervals()}
def is_tomorrow_data_available(self) -> bool | None:
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.data or "priceInfo" not in self.data:
return None
tomorrow_prices = self.data["priceInfo"].get("tomorrow", [])
interval_count = len(tomorrow_prices)
min_tomorrow_intervals_hourly = 24
min_tomorrow_intervals_15min = 96
tomorrow_interval_counts = {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min}
return interval_count in tomorrow_interval_counts
async def async_request_refresh(self) -> None:
"""
@ -756,9 +691,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
self._force_update = True
await self.async_refresh()
def _transform_api_response(self, data: dict[str, Any]) -> TibberPricesData:
def _transform_api_response(self, data: dict[str, Any]) -> dict:
"""Transform API response to coordinator data format."""
return cast("TibberPricesData", data)
return cast("dict", data)
async def _perform_midnight_rotation(self) -> None:
"""
@ -775,8 +710,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
async with self._rotation_lock:
try:
subscription = self._cached_price_data["data"]["viewer"]["homes"][0]["currentSubscription"]
price_info = subscription["priceInfo"]
price_info = self._cached_price_data["priceInfo"]
# Save current data state for logging
today_count = len(price_info.get("today", []))
@ -823,105 +757,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[TibberPricesData])
except (KeyError, TypeError, ValueError) as ex:
LOGGER.error("Error during midnight data rotation in hourly update: %s", ex)
def get_all_intervals(self) -> list[dict]:
"""Return a combined, sorted list of all price intervals for yesterday, today, and tomorrow."""
if not self.data:
return []
price_info = self.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
all_prices = price_info.get("yesterday", []) + price_info.get("today", []) + price_info.get("tomorrow", [])
return sorted(all_prices, key=lambda p: p["startsAt"])
def get_interval_granularity(self) -> int | None:
"""Return the interval granularity in minutes (e.g., 15 or 60) for today's data."""
if not self.data:
return None
price_info = self.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
today_prices = price_info.get("today", [])
from .sensor import detect_interval_granularity
return detect_interval_granularity(today_prices) if today_prices else None
def get_current_interval_data(self) -> dict | None:
"""Return the price data for the current interval."""
if not self.data:
return None
price_info = self.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
now = dt_util.now()
interval_length = self.get_interval_granularity()
from .sensor import find_price_data_for_interval
return find_price_data_for_interval(price_info, now, interval_length)
def get_combined_price_info(self) -> dict:
"""Return a dict with all intervals under a single key 'all'."""
return {"all": self.get_all_intervals()}
def is_tomorrow_data_available(self) -> bool | None:
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.data:
return None
price_info = self.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
# Use the same logic as in binary_sensor.py
min_tomorrow_intervals_hourly = 24
min_tomorrow_intervals_15min = 96
tomorrow_interval_counts = {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min}
return interval_count in tomorrow_interval_counts
async def _async_initialize(self) -> None:
"""Load stored data."""
stored = await self._store.async_load()
if stored is None:
LOGGER.warning("No cache file found or cache is empty on startup.")
else:
LOGGER.debug("Loading stored data: %s", stored)
if stored:
# Load cached data
self._cached_price_data = cast("TibberPricesData", stored.get("price_data"))
self._cached_rating_data_hourly = cast("TibberPricesData", stored.get("rating_data_hourly"))
self._cached_rating_data_daily = cast("TibberPricesData", stored.get("rating_data_daily"))
self._cached_rating_data_monthly = cast("TibberPricesData", stored.get("rating_data_monthly"))
# Recover timestamps
self._last_price_update = self._recover_timestamp(self._cached_price_data, stored.get("last_price_update"))
self._last_rating_update_hourly = self._recover_timestamp(
self._cached_rating_data_hourly,
stored.get("last_rating_update_hourly"),
"hourly",
)
self._last_rating_update_daily = self._recover_timestamp(
self._cached_rating_data_daily,
stored.get("last_rating_update_daily"),
"daily",
)
self._last_rating_update_monthly = self._recover_timestamp(
self._cached_rating_data_monthly,
stored.get("last_rating_update_monthly"),
"monthly",
)
LOGGER.debug(
"Loaded stored cache data - Price update: %s, Rating hourly: %s, daily: %s, monthly: %s",
self._last_price_update,
self._last_rating_update_hourly,
self._last_rating_update_daily,
self._last_rating_update_monthly,
)
# Defensive: warn if any required data is missing
if self._cached_price_data is None:
LOGGER.warning("Cached price data missing after cache load!")
if self._last_price_update is None:
LOGGER.warning("Price update timestamp missing after cache load!")
else:
LOGGER.info("No cache loaded; will fetch fresh data on first update.")
@callback
def _recover_timestamp(
self,
data: TibberPricesData | None,
data: dict | None,
stored_timestamp: str | None,
rating_type: str | None = None,
) -> datetime | None:

View file

@ -310,8 +310,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
"""Get price for current hour or with offset."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
# Use HomeAssistant's dt_util to get the current time in the user's timezone
now = dt_util.now()
@ -408,7 +407,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
today_prices = price_info.get("today", [])
if not today_prices:
return None
@ -509,12 +508,11 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
self._last_rating_difference = None
self._last_rating_level = None
return None
subscription = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]
price_rating = subscription.get("priceRating", {}) or {}
price_rating = self.coordinator.data.get("priceRating", {})
now = dt_util.now()
rating_data = price_rating.get(rating_type, {})
entries = rating_data.get("entries", []) if rating_data else []
entry = self._find_rating_entry(entries, now, rating_type, dict(subscription))
# In the new flat format, price_rating[rating_type] is a list of entries
entries = price_rating.get(rating_type, [])
entry = self._find_rating_entry(entries, now, rating_type, dict(self.coordinator.data))
if entry:
difference = entry.get("difference")
level = entry.get("level")
@ -530,7 +528,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
latest_timestamp = None
for day in ["today", "tomorrow"]:
@ -565,10 +563,8 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_rating = (
self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"].get("priceRating", {}) or {}
)
price_info = self.coordinator.data["priceInfo"]
price_rating = self.coordinator.data.get("priceRating", {})
# Determine data granularity from the current price data
today_prices = price_info.get("today", [])
@ -878,7 +874,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
# Add timestamp for next interval price sensors
if self.entity_description.key in ["next_interval_price", "next_interval_price_eur"]:
# Get the next interval's data
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
today_prices = price_info.get("today", [])
data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL
now = dt_util.now()
@ -916,7 +912,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
def _add_statistics_attributes(self, attributes: dict) -> None:
"""Add attributes for statistics, rating, and diagnostic sensors."""
key = self.entity_description.key
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
price_info = self.coordinator.data["priceInfo"]
now = dt_util.now()
if key == "price_rating":
today_prices = price_info.get("today", [])