This commit is contained in:
Julian Pawlowski 2025-05-20 23:19:45 +00:00
parent 0a11f38a1b
commit dd65f0efad
5 changed files with 228 additions and 133 deletions

View file

@ -7,7 +7,6 @@ https://github.com/jpawlowski/hass.tibber_prices
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING
from homeassistant.const import CONF_ACCESS_TOKEN, Platform
@ -16,7 +15,7 @@ from homeassistant.helpers.storage import Store
from homeassistant.loader import async_get_loaded_integration
from .api import TibberPricesApiClient
from .const import DOMAIN, LOGGER, SCAN_INTERVAL, async_load_translations
from .const import DOMAIN, LOGGER, async_load_translations
from .coordinator import STORAGE_VERSION, TibberPricesDataUpdateCoordinator
from .data import TibberPricesData
from .services import async_setup_services
@ -48,13 +47,11 @@ async def async_setup_entry(
# Register services when a config entry is loaded
async_setup_services(hass)
# Use the defined SCAN_INTERVAL constant for consistent polling
coordinator = TibberPricesDataUpdateCoordinator(
hass=hass,
entry=entry,
logger=LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=SCAN_INTERVAL),
)
entry.runtime_data = TibberPricesData(
client=TibberPricesApiClient(

View file

@ -705,3 +705,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
return None
else:
return attributes if attributes else None
async def async_update(self) -> None:
"""Force a refresh when homeassistant.update_entity is called."""
await self.coordinator.async_request_refresh()

View file

@ -21,9 +21,6 @@ CONF_PEAK_PRICE_FLEX = "peak_price_flex"
ATTRIBUTION = "Data provided by Tibber"
# Update interval in seconds
SCAN_INTERVAL = 60 * 5 # 5 minutes
# Integration name should match manifest.json
DEFAULT_NAME = "Tibber Price Information & Ratings"
DEFAULT_EXTENDED_DESCRIPTIONS = False

View file

@ -4,7 +4,6 @@ from __future__ import annotations
import asyncio
import logging
import secrets
from datetime import date, datetime, timedelta
from typing import TYPE_CHECKING, Any, Final, cast
@ -52,15 +51,14 @@ def _get_latest_timestamp_from_prices(
price_data: dict | None,
) -> datetime | None:
"""Get the latest timestamp from price data."""
if not price_data or "priceInfo" not in price_data:
if not price_data:
return None
try:
price_info = price_data["priceInfo"]
latest_timestamp = None
# Check today's prices
if today_prices := price_info.get("today"):
if today_prices := price_data.get("today"):
for price in today_prices:
if starts_at := price.get("startsAt"):
timestamp = dt_util.parse_datetime(starts_at)
@ -68,7 +66,7 @@ def _get_latest_timestamp_from_prices(
latest_timestamp = timestamp
# Check tomorrow's prices
if tomorrow_prices := price_info.get("tomorrow"):
if tomorrow_prices := price_data.get("tomorrow"):
for price in tomorrow_prices:
if starts_at := price.get("startsAt"):
timestamp = dt_util.parse_datetime(starts_at)
@ -130,7 +128,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
self._last_rating_update_hourly: datetime | None = None
self._last_rating_update_daily: datetime | None = None
self._last_rating_update_monthly: datetime | None = None
self._scheduled_price_update: asyncio.Task | None = None
self._remove_update_listeners: list[Any] = []
self._force_update = False
self._rotation_lock = asyncio.Lock()
@ -138,7 +135,12 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
self._random_update_minute: int | None = None
self._random_update_date: date | None = None
self._remove_update_listeners.append(
async_track_time_change(hass, self._async_refresh_hourly, minute=0, second=0)
async_track_time_change(
hass,
self._async_refresh_quarter_hour,
minute=[0, 15, 30, 45],
second=0,
)
)
async def async_shutdown(self) -> None:
@ -152,10 +154,14 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
self._force_update = True
await self.async_refresh()
async def _async_refresh_hourly(self, now: datetime | None = None) -> None:
"""Handle the hourly refresh."""
async def _async_refresh_quarter_hour(self, now: datetime | None = None) -> None:
"""Refresh at every quarter hour, and rotate at midnight before update."""
if now and now.hour == 0 and now.minute == 0:
await self._perform_midnight_rotation()
if self._is_today_data_stale():
LOGGER.warning("Detected stale 'today' data (not from today) at midnight. Forcing full refresh.")
await self._fetch_all_data()
else:
await self._perform_midnight_rotation()
await self.async_refresh()
async def _async_update_data(self) -> dict:
@ -302,12 +308,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
return {"priceRating": {rating_type: entries, "thresholdPercentages": threshold, "currency": currency}}
async def _async_initialize(self) -> None:
"""Load stored data in flat format."""
"""Load stored data in flat format and check for stale 'today' data."""
stored = await self._store.async_load()
if stored is None:
LOGGER.warning("No cache file found or cache is empty on startup.")
else:
LOGGER.debug("Loading stored data: %s", stored)
if stored:
self._cached_price_data = stored.get("price_data")
self._cached_rating_data_hourly = stored.get("rating_data_hourly")
@ -340,6 +344,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
LOGGER.warning("Cached price data missing after cache load!")
if self._last_price_update is None:
LOGGER.warning("Price update timestamp missing after cache load!")
# Stale data detection on startup
if self._is_today_data_stale():
LOGGER.warning("Detected stale 'today' data on startup (not from today). Forcing full refresh.")
await self._fetch_all_data()
else:
LOGGER.info("No cache loaded; will fetch fresh data on first update.")
@ -383,31 +391,30 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
return
async with self._rotation_lock:
try:
price_info = self._cached_price_data["priceInfo"]
today_count = len(price_info.get("today", []))
tomorrow_count = len(price_info.get("tomorrow", []))
yesterday_count = len(price_info.get("yesterday", []))
today_count = len(self._cached_price_data.get("today", []))
tomorrow_count = len(self._cached_price_data.get("tomorrow", []))
yesterday_count = len(self._cached_price_data.get("yesterday", []))
LOGGER.debug(
"Before rotation - Yesterday: %d, Today: %d, Tomorrow: %d items",
yesterday_count,
today_count,
tomorrow_count,
)
if today_data := price_info.get("today"):
price_info["yesterday"] = today_data
if today_data := self._cached_price_data.get("today"):
self._cached_price_data["yesterday"] = today_data
else:
LOGGER.warning("No today's data available to move to yesterday")
if tomorrow_data := price_info.get("tomorrow"):
price_info["today"] = tomorrow_data
price_info["tomorrow"] = []
if tomorrow_data := self._cached_price_data.get("tomorrow"):
self._cached_price_data["today"] = tomorrow_data
self._cached_price_data["tomorrow"] = []
else:
LOGGER.warning("No tomorrow's data available to move to today")
await self._store_cache()
LOGGER.info(
"Completed midnight rotation - Yesterday: %d, Today: %d, Tomorrow: %d items",
len(price_info.get("yesterday", [])),
len(price_info.get("today", [])),
len(price_info.get("tomorrow", [])),
len(self._cached_price_data.get("yesterday", [])),
len(self._cached_price_data.get("today", [])),
len(self._cached_price_data.get("tomorrow", [])),
)
self._force_update = True
except (KeyError, TypeError, ValueError) as ex:
@ -438,47 +445,49 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
),
}
@callback
def _should_update_price_data(self, current_time: datetime) -> bool:
"""Decide if price data should be updated."""
current_hour = current_time.hour
def _log_update_decision(self, ctx: dict) -> None:
"""Log update decision context for debugging."""
LOGGER.debug("[tibber_prices] Update decision: %s", ctx)
def _get_tomorrow_data_status(self) -> tuple[int, bool]:
"""Return (interval_count, tomorrow_data_complete) for tomorrow's prices (flat structure)."""
tomorrow_prices = []
if self._cached_price_data and "priceInfo" in self._cached_price_data:
tomorrow_prices = self._cached_price_data["priceInfo"].get("tomorrow", [])
if self._cached_price_data:
raw_tomorrow = self._cached_price_data.get("tomorrow", [])
if raw_tomorrow is None:
LOGGER.warning(
"Tomorrow price data is None, treating as empty list. Full price_data: %s",
self._cached_price_data,
)
tomorrow_prices = []
elif not isinstance(raw_tomorrow, list):
LOGGER.warning(
"Tomorrow price data is not a list: %r. Full price_data: %s",
raw_tomorrow,
self._cached_price_data,
)
tomorrow_prices = list(raw_tomorrow) if hasattr(raw_tomorrow, "__iter__") else []
else:
tomorrow_prices = raw_tomorrow
else:
LOGGER.warning("No cached price_data available: %s", self._cached_price_data)
interval_count = len(tomorrow_prices)
min_tomorrow_intervals_hourly = 24
min_tomorrow_intervals_15min = 96
tomorrow_data_complete = interval_count in {min_tomorrow_intervals_hourly, min_tomorrow_intervals_15min}
should_update = False
if current_hour < PRICE_UPDATE_RANDOM_MIN_HOUR:
should_update = False
elif PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour < PRICE_UPDATE_RANDOM_MAX_HOUR:
today = current_time.date()
if self._random_update_date != today or self._random_update_minute is None:
self._random_update_date = today
self._random_update_minute = secrets.randbelow(RANDOM_DELAY_MAX_MINUTES)
if current_time.minute == self._random_update_minute:
if self._last_attempted_price_update:
since_last = current_time - self._last_attempted_price_update
if since_last < MIN_RETRY_INTERVAL:
LOGGER.debug(
"Skipping price update: last attempt was %s ago (<%s)",
since_last,
MIN_RETRY_INTERVAL,
)
should_update = False
else:
self._last_attempted_price_update = current_time
should_update = not tomorrow_data_complete
else:
self._last_attempted_price_update = current_time
should_update = not tomorrow_data_complete
else:
should_update = False
elif PRICE_UPDATE_RANDOM_MAX_HOUR <= current_hour < END_OF_DAY_HOUR:
should_update = not tomorrow_data_complete
else:
should_update = False
if interval_count == 0:
LOGGER.debug(
"Tomorrow price data is empty at late hour. Raw tomorrow data: %s | Full price_data: %s",
tomorrow_prices,
self._cached_price_data,
)
return interval_count, tomorrow_data_complete
@callback
def _should_update_price_data(self, current_time: datetime) -> bool:
"""Decide if price data should be updated. Logs all decision points for debugging."""
should_update, log_ctx = self._decide_price_update(current_time)
self._log_update_decision(log_ctx)
return should_update
@callback
@ -534,6 +543,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
latest_timestamp = timestamp_func(cached_data)
if not latest_timestamp:
return True
# Always use last_update if present and valid
if last_update and (current_time - last_update) < interval:
return False
if not last_update:
last_update = latest_timestamp
if update_window:
@ -549,12 +561,21 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
@callback
def _extract_data(self, data: dict, container_key: str, keys: tuple[str, ...]) -> dict:
"""Extract and harmonize data for caching in flat format."""
# For price data, just flatten to {key: list} for each key
try:
container = data[container_key]
if not isinstance(container, dict):
LOGGER.error(
"Extracted %s is not a dict: %r. Full data: %s",
container_key,
container,
data,
)
container = {}
extracted = {key: list(container.get(key, [])) for key in keys}
except (KeyError, IndexError, TypeError) as ex:
LOGGER.error("Error extracting %s data: %s", container_key, ex)
extracted = {key: [] for key in keys}
except (KeyError, IndexError, TypeError):
# For flat price data, just copy keys from data
extracted = {key: list(data.get(key, [])) for key in keys}
return extracted
@callback
@ -573,14 +594,19 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
@callback
def _merge_all_cached_data(self) -> dict:
"""Merge all cached data into flat format."""
"""Merge all cached data into Home Assistant-style structure: priceInfo, priceRating, currency."""
if not self._cached_price_data:
return {}
if "priceInfo" in self._cached_price_data:
merged = {"priceInfo": self._cached_price_data["priceInfo"]}
else:
merged = {"priceInfo": self._cached_price_data}
price_rating = {"hourly": [], "daily": [], "monthly": [], "thresholdPercentages": None, "currency": None}
merged = {
"priceInfo": dict(self._cached_price_data), # 'today', 'tomorrow', 'yesterday' under 'priceInfo'
}
price_rating = {
"hourly": [],
"daily": [],
"monthly": [],
"thresholdPercentages": None,
"currency": None,
}
for rating_type, cached in zip(
["hourly", "daily", "monthly"],
[self._cached_rating_data_hourly, self._cached_rating_data_daily, self._cached_rating_data_monthly],
@ -604,27 +630,25 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
stored_timestamp: str | None,
rating_type: str | None = None,
) -> datetime | None:
"""Recover timestamp from data or stored value."""
"""Recover timestamp from stored value or data."""
# Always prefer the stored timestamp if present and valid
if stored_timestamp:
return dt_util.parse_datetime(stored_timestamp)
ts = dt_util.parse_datetime(stored_timestamp)
if ts:
return ts
# Fallback to data-derived timestamp
if not data:
return None
if rating_type:
timestamp = self._get_latest_rating_timestamp(data, rating_type)
else:
timestamp = self._get_latest_price_timestamp(data)
if timestamp:
LOGGER.debug(
"Recovered %s timestamp from data: %s",
rating_type or "price",
timestamp,
)
else:
return None
return timestamp
@callback
@ -656,8 +680,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
@callback
def _get_latest_price_timestamp(self, price_data: dict | None) -> datetime | None:
"""Get the latest timestamp from price data (today and tomorrow)."""
today = self._get_latest_timestamp(price_data, "priceInfo", "today", "startsAt")
tomorrow = self._get_latest_timestamp(price_data, "priceInfo", "tomorrow", "startsAt")
if not price_data:
return None
today = self._get_latest_timestamp(price_data, "today", None, "startsAt")
tomorrow = self._get_latest_timestamp(price_data, "tomorrow", None, "startsAt")
if today and tomorrow:
return max(today, tomorrow)
return today or tomorrow
@ -696,26 +722,26 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
def get_all_intervals(self) -> list[dict]:
"""Return a combined, sorted list of all price intervals for yesterday, today, and tomorrow."""
if not self.data or "priceInfo" not in self.data:
return []
price_info = self.data["priceInfo"]
price_info = self.data.get("priceInfo", {}) if self.data else {}
all_prices = price_info.get("yesterday", []) + price_info.get("today", []) + price_info.get("tomorrow", [])
return sorted(all_prices, key=lambda p: p["startsAt"])
return sorted(
all_prices,
key=lambda p: dt_util.parse_datetime(p.get("startsAt") or "") or dt_util.now(),
)
def get_interval_granularity(self) -> int | None:
"""Return the interval granularity in minutes (e.g., 15 or 60) for today's data."""
if not self.data or "priceInfo" not in self.data:
return None
today_prices = self.data["priceInfo"].get("today", [])
price_info = self.data.get("priceInfo", {}) if self.data else {}
today_prices = price_info.get("today", [])
from .sensor import detect_interval_granularity
return detect_interval_granularity(today_prices) if today_prices else None
def get_current_interval_data(self) -> dict | None:
"""Return the price data for the current interval."""
if not self.data or "priceInfo" not in self.data:
price_info = self.data.get("priceInfo", {}) if self.data else {}
if not price_info:
return None
price_info = self.data["priceInfo"]
now = dt_util.now()
interval_length = self.get_interval_granularity()
from .sensor import find_price_data_for_interval
@ -728,9 +754,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
def is_tomorrow_data_available(self) -> bool | None:
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
if not self.data or "priceInfo" not in self.data:
return None
tomorrow_prices = self.data["priceInfo"].get("tomorrow", [])
tomorrow_prices = self.data.get("priceInfo", {}).get("tomorrow", []) if self.data else []
interval_count = len(tomorrow_prices)
min_tomorrow_intervals_hourly = 24
min_tomorrow_intervals_15min = 96
@ -740,3 +764,94 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
def _transform_api_response(self, data: dict[str, Any]) -> dict:
"""Transform API response to coordinator data format."""
return cast("dict", data)
def _should_update_random_window(self, current_time: datetime, log_ctx: dict) -> tuple[bool, dict]:
"""Determine if a random update should occur in the random window (13:00-15:00)."""
today = current_time.date()
if self._random_update_date != today or self._random_update_minute is None:
self._random_update_date = today
import secrets
self._random_update_minute = secrets.randbelow(RANDOM_DELAY_MAX_MINUTES)
log_ctx["window"] = "random"
log_ctx["random_update_minute"] = self._random_update_minute
log_ctx["current_minute"] = current_time.minute
if current_time.minute == self._random_update_minute:
if self._last_attempted_price_update:
since_last = current_time - self._last_attempted_price_update
log_ctx["since_last_attempt"] = str(since_last)
if since_last >= MIN_RETRY_INTERVAL:
self._last_attempted_price_update = current_time
log_ctx["reason"] = "random window, random minute, min retry met"
log_ctx["decision"] = True
return True, log_ctx
log_ctx["reason"] = "random window, random minute, min retry not met"
log_ctx["decision"] = False
return False, log_ctx
self._last_attempted_price_update = current_time
log_ctx["reason"] = "random window, first attempt"
log_ctx["decision"] = True
return True, log_ctx
log_ctx["reason"] = "random window, not random minute"
log_ctx["decision"] = False
return False, log_ctx
def _decide_price_update(self, current_time: datetime) -> tuple[bool, dict]:
current_hour = current_time.hour
log_ctx = {
"current_time": str(current_time),
"current_hour": current_hour,
"has_cached_price_data": bool(self._cached_price_data),
"last_price_update": str(self._last_price_update) if self._last_price_update else None,
}
should_update = False
if current_hour < PRICE_UPDATE_RANDOM_MIN_HOUR:
should_update = not self._cached_price_data
log_ctx["window"] = "early"
log_ctx["reason"] = "no cache" if should_update else "cache present"
log_ctx["decision"] = should_update
return should_update, log_ctx
interval_count, tomorrow_data_complete = self._get_tomorrow_data_status()
log_ctx["interval_count"] = interval_count
log_ctx["tomorrow_data_complete"] = tomorrow_data_complete
in_random_window = PRICE_UPDATE_RANDOM_MIN_HOUR <= current_hour < PRICE_UPDATE_RANDOM_MAX_HOUR
in_late_window = PRICE_UPDATE_RANDOM_MAX_HOUR <= current_hour < END_OF_DAY_HOUR
if (
tomorrow_data_complete
and self._last_price_update
and (current_time - self._last_price_update) < UPDATE_INTERVAL
):
should_update = False
log_ctx["window"] = "any"
log_ctx["reason"] = "tomorrow_data_complete and last_price_update < 24h"
log_ctx["decision"] = should_update
return should_update, log_ctx
if in_random_window and not tomorrow_data_complete:
return self._should_update_random_window(current_time, log_ctx)
if in_late_window and not tomorrow_data_complete:
should_update = True
log_ctx["window"] = "late"
log_ctx["reason"] = "late window, tomorrow data missing (force update)"
log_ctx["decision"] = should_update
return should_update, log_ctx
should_update = False
log_ctx["window"] = "late-or-random"
log_ctx["reason"] = "no update needed"
log_ctx["decision"] = should_update
return should_update, log_ctx
def _is_today_data_stale(self) -> bool:
"""Return True if the first 'today' interval is not from today (stale cache)."""
if not self._cached_price_data:
return True
today_prices = self._cached_price_data.get("today", [])
if not today_prices:
return True # No data, treat as stale
first = today_prices[0]
starts_at = first.get("startsAt")
if not starts_at:
return True
dt = dt_util.parse_datetime(starts_at)
if not dt:
return True
return dt.date() != dt_util.now().date()

View file

@ -274,15 +274,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
return self.coordinator.get_current_interval_data()
def _get_price_level_value(self) -> str | None:
"""
Get the current price level value as a translated string for the state.
The original (raw) value is stored for use as an attribute.
Returns:
The translated price level value for the state, or None if unavailable.
"""
"""Get the current price level value as a translated string for the state."""
current_interval_data = self._get_current_interval_data()
if not current_interval_data or "level" not in current_interval_data:
return None
@ -310,7 +302,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
"""Get price for current hour or with offset."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data["priceInfo"]
price_info = self.coordinator.data.get("priceInfo", {})
# Use HomeAssistant's dt_util to get the current time in the user's timezone
now = dt_util.now()
@ -352,21 +344,10 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
return None
def _get_interval_price_value(self, *, interval_offset: int, in_euro: bool) -> float | None:
"""
Get price for the current interval or with offset, handling different interval granularities.
Args:
interval_offset: Number of intervals to offset from current time
in_euro: Whether to return value in EUR (True) or cents/kWh (False)
Returns:
Price value in the requested unit or None if not available
"""
"""Get price for the current interval or with offset, handling different interval granularities."""
if not self.coordinator.data:
return None
# Use coordinator utility for all intervals and granularity
all_intervals = self.coordinator.get_all_intervals()
granularity = self.coordinator.get_interval_granularity()
if not all_intervals or granularity is None:
@ -374,7 +355,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
now = dt_util.now()
# Find the current interval index
current_idx = None
for idx, interval in enumerate(all_intervals):
starts_at = interval.get("startsAt")
@ -407,7 +387,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if not self.coordinator.data:
return None
price_info = self.coordinator.data["priceInfo"]
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
if not today_prices:
return None
@ -445,22 +425,20 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if (
en_translations
and "sensor" in en_translations
and "price_rating" in en_translations["sensor"]
and "price_rating" in en_translations
and "price_levels" in en_translations["sensor"]["price_rating"]
and level in en_translations["sensor"]["price_rating"]["price_levels"]
):
return en_translations["sensor"]["price_rating"]["price_levels"][level]
return level
def _find_rating_entry(
self, entries: list[dict], now: datetime, rating_type: str, subscription: dict
) -> dict | None:
def _find_rating_entry(self, entries: list[dict], now: datetime, rating_type: str) -> dict | None:
"""Find the correct rating entry for the given type and time."""
if not entries:
return None
predicate = None
if rating_type == "hourly":
price_info = subscription.get("priceInfo", {})
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL
@ -512,7 +490,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
now = dt_util.now()
# In the new flat format, price_rating[rating_type] is a list of entries
entries = price_rating.get(rating_type, [])
entry = self._find_rating_entry(entries, now, rating_type, dict(self.coordinator.data))
entry = self._find_rating_entry(entries, now, rating_type)
if entry:
difference = entry.get("difference")
level = entry.get("level")
@ -528,7 +506,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if not self.coordinator.data:
return None
price_info = self.coordinator.data["priceInfo"]
price_info = self.coordinator.data.get("priceInfo", {})
latest_timestamp = None
for day in ["today", "tomorrow"]:
@ -563,7 +541,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if not self.coordinator.data:
return None
price_info = self.coordinator.data["priceInfo"]
price_info = self.coordinator.data.get("priceInfo", {})
price_rating = self.coordinator.data.get("priceRating", {})
# Determine data granularity from the current price data
@ -874,7 +852,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
# Add timestamp for next interval price sensors
if self.entity_description.key in ["next_interval_price", "next_interval_price_eur"]:
# Get the next interval's data
price_info = self.coordinator.data["priceInfo"]
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL
now = dt_util.now()
@ -912,7 +890,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
def _add_statistics_attributes(self, attributes: dict) -> None:
"""Add attributes for statistics, rating, and diagnostic sensors."""
key = self.entity_description.key
price_info = self.coordinator.data["priceInfo"]
price_info = self.coordinator.data.get("priceInfo", {})
now = dt_util.now()
if key == "price_rating":
today_prices = price_info.get("today", [])
@ -944,6 +922,10 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
first_timestamp = price_info.get("today", [{}])[0].get("startsAt")
attributes["timestamp"] = first_timestamp
async def async_update(self) -> None:
"""Force a refresh when homeassistant.update_entity is called."""
await self.coordinator.async_request_refresh()
def detect_interval_granularity(price_data: list[dict]) -> int:
"""