This commit is contained in:
Julian Pawlowski 2025-05-20 20:44:35 +00:00
parent a375026e07
commit 0a11f38a1b
2 changed files with 239 additions and 328 deletions

View file

@ -92,6 +92,20 @@ Use the following order inside Python modules:
- All code should assume a clean, modern codebase and should target the latest stable Home Assistant version only, unless otherwise specified.
- If you believe backward compatibility might be required, **ask for clarification first** before adding any related code.
## Translations Policy
- All user-facing strings supported by Home Assistant's translation system **must** be defined in `/translations/en.json` and (if present) in other `/translations/*.json` language files.
- When adding or updating a translation key in `/translations/en.json`, **ensure that all other language files in `/translations/` are updated to match the same set of keys**. Non-English files may use placeholder values if no translation is available, but **must** not miss any keys present in `en.json`.
- Do **not** remove or rename translation keys without updating all language files accordingly.
- Never duplicate translation keys between `/translations/` and `/custom_translations/`.
- The `/custom_translations/` directory contains **supplemental translation files** for UI strings or other content not handled by the standard Home Assistant translation format.
- Only add strings to `/custom_translations/` if they are not supported by the standard Home Assistant translation system.
- Do **not** duplicate any string or translation key that could be handled in `/translations/`.
- When both exist, the standard Home Assistant translation in `/translations/` **always takes priority** over any supplemental entry in `/custom_translations/`.
- All translation files (both standard and custom) **must remain in sync** with the English base file (`en.json`) in their respective directory.
> ✅ Copilot tip: Whenever adding or changing user-facing strings, update both the main translation files in `/translations/` and the supplemental files in `/custom_translations/`, keeping them in sync and avoiding duplication.
## Data Structures
Use `@dataclass` for plain data containers where appropriate:

View file

@ -107,9 +107,8 @@ def _get_latest_timestamp_from_rating(
return latest_timestamp
# https://developers.home-assistant.io/docs/integration_fetching_data#coordinated-single-api-poll-for-data-for-all-entities
class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
"""Class to manage fetching data from the API."""
"""Coordinator for fetching Tibber price data."""
def __init__(
self,
@ -134,12 +133,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
self._scheduled_price_update: asyncio.Task | None = None
self._remove_update_listeners: list[Any] = []
self._force_update = False
self._rotation_lock = asyncio.Lock() # Add lock for data rotation operations
self._rotation_lock = asyncio.Lock()
self._last_attempted_price_update: datetime | None = None
self._random_update_minute: int | None = None
self._random_update_date: date | None = None
# Schedule updates at the start of every hour
self._remove_update_listeners.append(
async_track_time_change(hass, self._async_refresh_hourly, minute=0, second=0)
)
@ -150,39 +147,24 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
for listener in self._remove_update_listeners:
listener()
async def _async_refresh_hourly(self, now: datetime | None = None) -> None:
"""
Handle the hourly refresh.
async def async_request_refresh(self) -> None:
"""Request an immediate refresh of the data."""
self._force_update = True
await self.async_refresh()
This will:
1. Check if it's midnight and handle rotation if needed
2. Then perform a regular refresh
"""
# If this is a midnight update (hour=0), handle data rotation first
async def _async_refresh_hourly(self, now: datetime | None = None) -> None:
"""Handle the hourly refresh."""
if now and now.hour == 0 and now.minute == 0:
await self._perform_midnight_rotation()
# Then do a regular refresh
await self.async_refresh()
async def _async_update_data(self) -> dict:
"""
Fetch new state data for the coordinator.
This method will:
1. Initialize cached data if none exists
2. Force update if requested via async_request_refresh
3. Otherwise, check if update conditions are met
4. Use cached data as fallback if API call fails
"""
"""Fetch new state data for the coordinator."""
if self._cached_price_data is None:
await self._async_initialize()
try:
current_time = dt_util.now()
result = None
# If force update requested, fetch all data
if self._force_update:
LOGGER.debug(
"Force updating data",
@ -197,11 +179,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
},
},
)
self._force_update = False # Reset force update flag
self._force_update = False
result = await self._fetch_all_data()
else:
result = await self._handle_conditional_update(current_time)
except TibberPricesApiClientAuthenticationError as exception:
LOGGER.error(
"Authentication failed",
@ -231,7 +212,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
"Unexpected error",
extra={"error": str(exception), "error_type": "unexpected"},
)
if self._cached_price_data is not None:
LOGGER.info("Using cached data as fallback")
return self._merge_all_cached_data()
@ -241,122 +221,127 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
async def _handle_conditional_update(self, current_time: datetime) -> dict:
"""Handle conditional update based on update conditions."""
# Simplified conditional update checking
update_conditions = self._check_update_conditions(current_time)
if any(update_conditions.values()):
LOGGER.debug(
"Updating data based on conditions",
extra=update_conditions,
)
return await self._fetch_all_data()
if self._cached_price_data is not None:
LOGGER.debug("Using cached data")
return self._merge_all_cached_data()
LOGGER.debug("No cached data available, fetching new data")
return await self._fetch_all_data()
@callback
def _check_update_conditions(self, current_time: datetime) -> dict[str, bool]:
"""Check all update conditions and return results as a dictionary."""
return {
"update_price": self._should_update_price_data(current_time),
"update_hourly": self._should_update_rating_type(
current_time,
self._cached_rating_data_hourly,
self._last_rating_update_hourly,
"hourly",
),
"update_daily": self._should_update_rating_type(
current_time,
self._cached_rating_data_daily,
self._last_rating_update_daily,
"daily",
),
"update_monthly": self._should_update_rating_type(
current_time,
self._cached_rating_data_monthly,
self._last_rating_update_monthly,
"monthly",
),
}
async def _fetch_all_data(self) -> dict:
"""
Fetch all data from the API without checking update conditions.
This method will:
1. Fetch all required data (price and rating data)
2. Validate that all data is complete and valid
3. Only then update the cache with the new data
4. If any data is invalid, keep using the cached data
"""
"""Fetch all data from the API without checking update conditions."""
current_time = dt_util.now()
new_data = {
"price_data": None,
"rating_data": {"hourly": None, "daily": None, "monthly": None},
}
# First fetch all data without updating cache
try:
# Fetch price data
price_data = await self._fetch_price_data()
new_data["price_data"] = self._extract_data(price_data, "priceInfo", ("yesterday", "today", "tomorrow"))
# Fetch all rating data
for rating_type in ["hourly", "daily", "monthly"]:
try:
rating_data = await self._get_rating_data_for_type(rating_type)
new_data["rating_data"][rating_type] = rating_data
except TibberPricesApiClientError as ex:
LOGGER.error("Failed to fetch %s rating data: %s", rating_type, ex)
# Don't raise here, we'll check completeness later
except TibberPricesApiClientError as ex:
LOGGER.error("Failed to fetch price data: %s", ex)
if self._cached_price_data is not None:
LOGGER.info("Using cached data as fallback after price data fetch failure")
return self._merge_all_cached_data()
raise
# Validate that we have all required data
if new_data["price_data"] is None:
LOGGER.error("No price data available after fetch")
if self._cached_price_data is not None:
LOGGER.info("Using cached data as fallback due to missing price data")
return self._merge_all_cached_data()
_raise_no_data()
# Only update cache if we have valid data
self._cached_price_data = cast("dict", new_data["price_data"])
self._last_price_update = current_time
# Update rating data cache only for types that were successfully fetched
for rating_type, rating_data in new_data["rating_data"].items():
if rating_data is not None:
self._update_rating_cache(rating_type, rating_data, current_time)
# Store the updated cache
await self._store_cache()
LOGGER.debug("Updated and stored all cache data at %s", current_time)
# Return merged data
return self._merge_all_cached_data()
@callback
def _update_rating_cache(self, rating_type: str, rating_data: dict, current_time: datetime) -> None:
"""Update the rating cache for a specific rating type."""
if rating_type == "hourly":
self._cached_rating_data_hourly = cast("dict", rating_data)
self._last_rating_update_hourly = current_time
elif rating_type == "daily":
self._cached_rating_data_daily = cast("dict", rating_data)
self._last_rating_update_daily = current_time
else: # monthly
self._cached_rating_data_monthly = cast("dict", rating_data)
self._last_rating_update_monthly = current_time
LOGGER.debug("Updated %s rating data cache at %s", rating_type, current_time)
async def _fetch_price_data(self) -> dict:
"""Fetch fresh price data from API."""
client = self.config_entry.runtime_data.client
return await client.async_get_price_info()
async def _get_rating_data_for_type(self, rating_type: str) -> dict:
"""Get fresh rating data for a specific type in flat format."""
client = self.config_entry.runtime_data.client
method_map = {
"hourly": client.async_get_hourly_price_rating,
"daily": client.async_get_daily_price_rating,
"monthly": client.async_get_monthly_price_rating,
}
fetch_method = method_map.get(rating_type)
if not fetch_method:
msg = f"Unknown rating type: {rating_type}"
raise ValueError(msg)
data = await fetch_method()
try:
price_rating = data.get("priceRating", data)
threshold = price_rating.get("thresholdPercentages")
entries = price_rating.get(rating_type, [])
currency = price_rating.get("currency")
except KeyError as ex:
LOGGER.error("Failed to extract rating data (flat format): %s", ex)
raise TibberPricesApiClientError(
TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=rating_type)
) from ex
return {"priceRating": {rating_type: entries, "thresholdPercentages": threshold, "currency": currency}}
async def _async_initialize(self) -> None:
"""Load stored data in flat format."""
stored = await self._store.async_load()
if stored is None:
LOGGER.warning("No cache file found or cache is empty on startup.")
else:
LOGGER.debug("Loading stored data: %s", stored)
if stored:
self._cached_price_data = stored.get("price_data")
self._cached_rating_data_hourly = stored.get("rating_data_hourly")
self._cached_rating_data_daily = stored.get("rating_data_daily")
self._cached_rating_data_monthly = stored.get("rating_data_monthly")
self._last_price_update = self._recover_timestamp(self._cached_price_data, stored.get("last_price_update"))
self._last_rating_update_hourly = self._recover_timestamp(
self._cached_rating_data_hourly,
stored.get("last_rating_update_hourly"),
"hourly",
)
self._last_rating_update_daily = self._recover_timestamp(
self._cached_rating_data_daily,
stored.get("last_rating_update_daily"),
"daily",
)
self._last_rating_update_monthly = self._recover_timestamp(
self._cached_rating_data_monthly,
stored.get("last_rating_update_monthly"),
"monthly",
)
LOGGER.debug(
"Loaded stored cache data - Price update: %s, Rating hourly: %s, daily: %s, monthly: %s",
self._last_price_update,
self._last_rating_update_hourly,
self._last_rating_update_daily,
self._last_rating_update_monthly,
)
if self._cached_price_data is None:
LOGGER.warning("Cached price data missing after cache load!")
if self._last_price_update is None:
LOGGER.warning("Price update timestamp missing after cache load!")
else:
LOGGER.info("No cache loaded; will fetch fresh data on first update.")
async def _store_cache(self) -> None:
"""Store cache data in flat format."""
@ -390,18 +375,73 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
except OSError as ex:
LOGGER.error("Failed to write cache to disk: %s", ex)
async def _perform_midnight_rotation(self) -> None:
"""Perform the data rotation at midnight within the hourly update process."""
LOGGER.info("Performing midnight data rotation as part of hourly update cycle")
if not self._cached_price_data:
LOGGER.debug("No cached price data available for midnight rotation")
return
async with self._rotation_lock:
try:
price_info = self._cached_price_data["priceInfo"]
today_count = len(price_info.get("today", []))
tomorrow_count = len(price_info.get("tomorrow", []))
yesterday_count = len(price_info.get("yesterday", []))
LOGGER.debug(
"Before rotation - Yesterday: %d, Today: %d, Tomorrow: %d items",
yesterday_count,
today_count,
tomorrow_count,
)
if today_data := price_info.get("today"):
price_info["yesterday"] = today_data
else:
LOGGER.warning("No today's data available to move to yesterday")
if tomorrow_data := price_info.get("tomorrow"):
price_info["today"] = tomorrow_data
price_info["tomorrow"] = []
else:
LOGGER.warning("No tomorrow's data available to move to today")
await self._store_cache()
LOGGER.info(
"Completed midnight rotation - Yesterday: %d, Today: %d, Tomorrow: %d items",
len(price_info.get("yesterday", [])),
len(price_info.get("today", [])),
len(price_info.get("tomorrow", [])),
)
self._force_update = True
except (KeyError, TypeError, ValueError) as ex:
LOGGER.error("Error during midnight data rotation in hourly update: %s", ex)
@callback
def _check_update_conditions(self, current_time: datetime) -> dict[str, bool]:
"""Check all update conditions and return results as a dictionary."""
return {
"update_price": self._should_update_price_data(current_time),
"update_hourly": self._should_update_rating_type(
current_time,
self._cached_rating_data_hourly,
self._last_rating_update_hourly,
"hourly",
),
"update_daily": self._should_update_rating_type(
current_time,
self._cached_rating_data_daily,
self._last_rating_update_daily,
"daily",
),
"update_monthly": self._should_update_rating_type(
current_time,
self._cached_rating_data_monthly,
self._last_rating_update_monthly,
"monthly",
),
}
@callback
def _should_update_price_data(self, current_time: datetime) -> bool:
"""
Decide if price data should be updated.
- No fetch before 13:00.
- Randomized fetch minute in update window (13:00-15:00).
- Always fetch after 15:00 if tomorrow's data is missing.
- No fetch after midnight until 13:00.
"""
"""Decide if price data should be updated."""
current_hour = current_time.hour
# Check if tomorrow's data is available
tomorrow_prices = []
if self._cached_price_data and "priceInfo" in self._cached_price_data:
tomorrow_prices = self._cached_price_data["priceInfo"].get("tomorrow", [])
@ -409,20 +449,15 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
min_tomorrow_intervals_hourly = 24
min_tomorrow_intervals_15min = 96
tomorrow_data_complete = interval_count in {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min}
should_update = False
# 1. Before 13:00: never fetch
if current_hour < PRICE_UPDATE_RANDOM_MIN_HOUR:
should_update = False
# 2. In update window (13:00-15:00): fetch at random minute, with min retry interval
elif PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour < PRICE_UPDATE_RANDOM_MAX_HOUR:
today = current_time.date()
if self._random_update_date != today or self._random_update_minute is None:
self._random_update_date = today
self._random_update_minute = secrets.randbelow(RANDOM_DELAY_MAX_MINUTES)
# Only fetch at the random minute
if current_time.minute == self._random_update_minute:
# Enforce minimum retry interval
if self._last_attempted_price_update:
since_last = current_time - self._last_attempted_price_update
if since_last < MIN_RETRY_INTERVAL:
@ -440,10 +475,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
should_update = not tomorrow_data_complete
else:
should_update = False
# 3. After update window (15:00-00:00): always fetch if tomorrow's data is missing
elif PRICE_UPDATE_RANDOM_MAX_HOUR <= current_hour < END_OF_DAY_HOUR:
should_update = not tomorrow_data_complete
# 4. After midnight until 13:00: never fetch
else:
should_update = False
return should_update
@ -513,11 +546,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
return True
return extra_check(current_time, latest_timestamp) if extra_check else False
async def _fetch_price_data(self) -> dict:
"""Fetch fresh price data from API."""
client = self.config_entry.runtime_data.client
return await client.async_get_price_info()
@callback
def _extract_data(self, data: dict, container_key: str, keys: tuple[str, ...]) -> dict:
"""Extract and harmonize data for caching in flat format."""
@ -530,56 +558,24 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
return extracted
@callback
def _get_latest_timestamp_from_rating_type(self, rating_data: dict | None, rating_type: str) -> datetime | None:
"""Get the latest timestamp from a specific rating type."""
if not rating_data or "priceRating" not in rating_data:
return None
try:
price_rating = rating_data["priceRating"]
result = None
if rating_entries := price_rating.get(rating_type, []):
for entry in rating_entries:
if time := entry.get("time"):
timestamp = dt_util.parse_datetime(time)
if timestamp and (not result or timestamp > result):
result = timestamp
except (KeyError, IndexError, TypeError):
return None
return result
async def _get_rating_data_for_type(self, rating_type: str) -> dict:
"""Get fresh rating data for a specific type in flat format."""
client = self.config_entry.runtime_data.client
method_map = {
"hourly": client.async_get_hourly_price_rating,
"daily": client.async_get_daily_price_rating,
"monthly": client.async_get_monthly_price_rating,
}
fetch_method = method_map.get(rating_type)
if not fetch_method:
msg = f"Unknown rating type: {rating_type}"
raise ValueError(msg)
data = await fetch_method()
try:
price_rating = data.get("priceRating", data)
threshold = price_rating.get("thresholdPercentages")
entries = price_rating.get(rating_type, [])
currency = price_rating.get("currency")
except KeyError as ex:
LOGGER.error("Failed to extract rating data (flat format): %s", ex)
raise TibberPricesApiClientError(
TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=rating_type)
) from ex
return {"priceRating": {rating_type: entries, "thresholdPercentages": threshold, "currency": currency}}
def _update_rating_cache(self, rating_type: str, rating_data: dict, current_time: datetime) -> None:
"""Update the rating cache for a specific rating type."""
if rating_type == "hourly":
self._cached_rating_data_hourly = cast("dict", rating_data)
self._last_rating_update_hourly = current_time
elif rating_type == "daily":
self._cached_rating_data_daily = cast("dict", rating_data)
self._last_rating_update_daily = current_time
else:
self._cached_rating_data_monthly = cast("dict", rating_data)
self._last_rating_update_monthly = current_time
LOGGER.debug("Updated %s rating data cache at %s", rating_type, current_time)
@callback
def _merge_all_cached_data(self) -> dict:
"""Merge all cached data into flat format."""
if not self._cached_price_data:
return {}
# Wrap cached price data in 'priceInfo' only if not already wrapped
if "priceInfo" in self._cached_price_data:
merged = {"priceInfo": self._cached_price_data["priceInfo"]}
else:
@ -601,170 +597,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
merged["currency"] = price_rating["currency"]
return merged
async def _async_initialize(self) -> None:
"""Load stored data in flat format."""
stored = await self._store.async_load()
if stored is None:
LOGGER.warning("No cache file found or cache is empty on startup.")
else:
LOGGER.debug("Loading stored data: %s", stored)
if stored:
self._cached_price_data = stored.get("price_data")
self._cached_rating_data_hourly = stored.get("rating_data_hourly")
self._cached_rating_data_daily = stored.get("rating_data_daily")
self._cached_rating_data_monthly = stored.get("rating_data_monthly")
self._last_price_update = self._recover_timestamp(self._cached_price_data, stored.get("last_price_update"))
self._last_rating_update_hourly = self._recover_timestamp(
self._cached_rating_data_hourly,
stored.get("last_rating_update_hourly"),
"hourly",
)
self._last_rating_update_daily = self._recover_timestamp(
self._cached_rating_data_daily,
stored.get("last_rating_update_daily"),
"daily",
)
self._last_rating_update_monthly = self._recover_timestamp(
self._cached_rating_data_monthly,
stored.get("last_rating_update_monthly"),
"monthly",
)
LOGGER.debug(
"Loaded stored cache data - Price update: %s, Rating hourly: %s, daily: %s, monthly: %s",
self._last_price_update,
self._last_rating_update_hourly,
self._last_rating_update_daily,
self._last_rating_update_monthly,
)
if self._cached_price_data is None:
LOGGER.warning("Cached price data missing after cache load!")
if self._last_price_update is None:
LOGGER.warning("Price update timestamp missing after cache load!")
else:
LOGGER.info("No cache loaded; will fetch fresh data on first update.")
def get_all_intervals(self) -> list[dict]:
"""Return a combined, sorted list of all price intervals for yesterday, today, and tomorrow."""
if not self.data or "priceInfo" not in self.data:
return []
price_info = self.data["priceInfo"]
all_prices = price_info.get("yesterday", []) + price_info.get("today", []) + price_info.get("tomorrow", [])
return sorted(all_prices, key=lambda p: p["startsAt"])
def get_interval_granularity(self) -> int | None:
"""Return the interval granularity in minutes (e.g., 15 or 60) for today's data."""
if not self.data or "priceInfo" not in self.data:
return None
today_prices = self.data["priceInfo"].get("today", [])
from .sensor import detect_interval_granularity
return detect_interval_granularity(today_prices) if today_prices else None
def get_current_interval_data(self) -> dict | None:
"""Return the price data for the current interval."""
if not self.data or "priceInfo" not in self.data:
return None
price_info = self.data["priceInfo"]
now = dt_util.now()
interval_length = self.get_interval_granularity()
from .sensor import find_price_data_for_interval
return find_price_data_for_interval(price_info, now, interval_length)
def get_combined_price_info(self) -> dict:
"""Return a dict with all intervals under a single key 'all'."""
return {"all": self.get_all_intervals()}
def is_tomorrow_data_available(self) -> bool | None:
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.data or "priceInfo" not in self.data:
return None
tomorrow_prices = self.data["priceInfo"].get("tomorrow", [])
interval_count = len(tomorrow_prices)
min_tomorrow_intervals_hourly = 24
min_tomorrow_intervals_15min = 96
tomorrow_interval_counts = {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min}
return interval_count in tomorrow_interval_counts
async def async_request_refresh(self) -> None:
"""
Request an immediate refresh of the data.
This method will:
1. Set the force update flag to trigger a fresh data fetch
2. Call async_refresh to perform the update
The force update flag will be reset after the update is complete.
"""
self._force_update = True
await self.async_refresh()
def _transform_api_response(self, data: dict[str, Any]) -> dict:
"""Transform API response to coordinator data format."""
return cast("dict", data)
async def _perform_midnight_rotation(self) -> None:
"""
Perform the data rotation at midnight within the hourly update process.
This ensures that data rotation completes before any regular updates run,
avoiding race conditions between midnight rotation and regular updates.
"""
LOGGER.info("Performing midnight data rotation as part of hourly update cycle")
if not self._cached_price_data:
LOGGER.debug("No cached price data available for midnight rotation")
return
async with self._rotation_lock:
try:
price_info = self._cached_price_data["priceInfo"]
# Save current data state for logging
today_count = len(price_info.get("today", []))
tomorrow_count = len(price_info.get("tomorrow", []))
yesterday_count = len(price_info.get("yesterday", []))
LOGGER.debug(
"Before rotation - Yesterday: %d, Today: %d, Tomorrow: %d items",
yesterday_count,
today_count,
tomorrow_count,
)
# Move today's data to yesterday
if today_data := price_info.get("today"):
price_info["yesterday"] = today_data
else:
LOGGER.warning("No today's data available to move to yesterday")
# Move tomorrow's data to today
if tomorrow_data := price_info.get("tomorrow"):
price_info["today"] = tomorrow_data
price_info["tomorrow"] = []
else:
LOGGER.warning("No tomorrow's data available to move to today")
# We don't clear today's data here to avoid potential data loss
# If somehow tomorrow's data isn't available, keep today's data
# This is different from the separate midnight rotation handler
# Store the rotated data
await self._store_cache()
# Log the new state
LOGGER.info(
"Completed midnight rotation - Yesterday: %d, Today: %d, Tomorrow: %d items",
len(price_info.get("yesterday", [])),
len(price_info.get("today", [])),
len(price_info.get("tomorrow", [])),
)
# Flag that we need to fetch new tomorrow's data
self._force_update = True
except (KeyError, TypeError, ValueError) as ex:
LOGGER.error("Error during midnight data rotation in hourly update: %s", ex)
@callback
def _recover_timestamp(
self,
@ -824,7 +656,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
@callback
def _get_latest_price_timestamp(self, price_data: dict | None) -> datetime | None:
"""Get the latest timestamp from price data (today and tomorrow)."""
# Check both today and tomorrow, return the latest
today = self._get_latest_timestamp(price_data, "priceInfo", "today", "startsAt")
tomorrow = self._get_latest_timestamp(price_data, "priceInfo", "tomorrow", "startsAt")
if today and tomorrow:
@ -835,7 +666,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
def _get_latest_rating_timestamp(self, rating_data: dict | None, rating_type: str | None = None) -> datetime | None:
"""Get the latest timestamp from rating data, optionally for a specific type."""
if not rating_type:
# Check all types and return the latest
latest = None
for rtype in ("hourly", "daily", "monthly"):
ts = self._get_latest_timestamp(rating_data, "priceRating", rtype, "time")
@ -843,3 +673,70 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
latest = ts
return latest
return self._get_latest_timestamp(rating_data, "priceRating", rating_type, "time")
@callback
def _get_latest_timestamp_from_rating_type(self, rating_data: dict | None, rating_type: str) -> datetime | None:
"""Get the latest timestamp from a specific rating type."""
if not rating_data or "priceRating" not in rating_data:
return None
try:
price_rating = rating_data["priceRating"]
result = None
if rating_entries := price_rating.get(rating_type, []):
for entry in rating_entries:
if time := entry.get("time"):
timestamp = dt_util.parse_datetime(time)
if timestamp and (not result or timestamp > result):
result = timestamp
except (KeyError, IndexError, TypeError):
return None
return result
def get_all_intervals(self) -> list[dict]:
"""Return a combined, sorted list of all price intervals for yesterday, today, and tomorrow."""
if not self.data or "priceInfo" not in self.data:
return []
price_info = self.data["priceInfo"]
all_prices = price_info.get("yesterday", []) + price_info.get("today", []) + price_info.get("tomorrow", [])
return sorted(all_prices, key=lambda p: p["startsAt"])
def get_interval_granularity(self) -> int | None:
"""Return the interval granularity in minutes (e.g., 15 or 60) for today's data."""
if not self.data or "priceInfo" not in self.data:
return None
today_prices = self.data["priceInfo"].get("today", [])
from .sensor import detect_interval_granularity
return detect_interval_granularity(today_prices) if today_prices else None
def get_current_interval_data(self) -> dict | None:
"""Return the price data for the current interval."""
if not self.data or "priceInfo" not in self.data:
return None
price_info = self.data["priceInfo"]
now = dt_util.now()
interval_length = self.get_interval_granularity()
from .sensor import find_price_data_for_interval
return find_price_data_for_interval(price_info, now, interval_length)
def get_combined_price_info(self) -> dict:
"""Return a dict with all intervals under a single key 'all'."""
return {"all": self.get_all_intervals()}
def is_tomorrow_data_available(self) -> bool | None:
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.data or "priceInfo" not in self.data:
return None
tomorrow_prices = self.data["priceInfo"].get("tomorrow", [])
interval_count = len(tomorrow_prices)
min_tomorrow_intervals_hourly = 24
min_tomorrow_intervals_15min = 96
tomorrow_interval_counts = {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min}
return interval_count in tomorrow_interval_counts
def _transform_api_response(self, data: dict[str, Any]) -> dict:
"""Transform API response to coordinator data format."""
return cast("dict", data)