fix(coordinator): use coordinator update timestamp for cache validity

Cache validity now checks _last_coordinator_update (within 30min)
instead of _api_calls_today counter. Fixes false "stale" status
when coordinator runs every 15min but cache validation was only
checking API call counter.

Bug #1: Cache validity shows "stale" at 05:57 AM
Bug #2: Cache age calculation incorrect after midnight turnover
Bug #3: get_cache_validity inconsistent with cache_age sensor

Changes:
- Coordinator: Use _last_coordinator_update for cache validation
- Lifecycle: Extract cache validation to dedicated helper function
- Tests: 7 new tests covering midnight scenarios and edge cases

Impact: Cache validity sensor now accurately reflects coordinator
activity, not just explicit API calls. Correctly handles midnight
turnover without false "stale" status.
This commit is contained in:
Julian Pawlowski 2025-11-22 04:44:22 +00:00
parent c6f41b1aa5
commit 49866f26fa
6 changed files with 909 additions and 56 deletions

View file

@ -31,6 +31,7 @@ from custom_components.tibber_prices.const import DOMAIN
from custom_components.tibber_prices.utils.price import ( from custom_components.tibber_prices.utils.price import (
find_price_data_for_interval, find_price_data_for_interval,
) )
from homeassistant.exceptions import ConfigEntryAuthFailed
from . import helpers from . import helpers
from .constants import ( from .constants import (
@ -40,6 +41,7 @@ from .constants import (
from .data_fetching import TibberPricesDataFetcher from .data_fetching import TibberPricesDataFetcher
from .data_transformation import TibberPricesDataTransformer from .data_transformation import TibberPricesDataTransformer
from .listeners import TibberPricesListenerManager from .listeners import TibberPricesListenerManager
from .midnight_handler import TibberPricesMidnightHandler
from .periods import TibberPricesPeriodCalculator from .periods import TibberPricesPeriodCalculator
from .time_service import TibberPricesTimeService from .time_service import TibberPricesTimeService
@ -48,6 +50,44 @@ _LOGGER = logging.getLogger(__name__)
# Lifecycle state transition thresholds # Lifecycle state transition thresholds
FRESH_TO_CACHED_SECONDS = 300 # 5 minutes FRESH_TO_CACHED_SECONDS = 300 # 5 minutes
def get_connection_state(coordinator: TibberPricesDataUpdateCoordinator) -> bool | None:
"""
Determine API connection state based on lifecycle and exceptions.
This is the source of truth for the connection binary sensor.
It ensures consistency between lifecycle_status and connection state.
Returns:
True: Connected and working (cached or fresh data)
False: Connection failed or auth failed
None: Unknown state (no data yet, initializing)
Logic:
- Auth failures definitively disconnected (False)
- Other errors with cached data considered connected (True, using cache)
- No errors with data connected (True)
- No data and no error initializing (None)
"""
# Auth failures = definitively disconnected
# User must provide new token via reauth flow
if isinstance(coordinator.last_exception, ConfigEntryAuthFailed):
return False
# Other errors but cache available = considered connected (using cached data as fallback)
# This shows "on" but lifecycle_status will show "error" to indicate degraded operation
if coordinator.last_exception and coordinator.data:
return True
# No error and data available = connected
if coordinator.data:
return True
# No data and no error = initializing (unknown state)
return None
# ============================================================================= # =============================================================================
# TIMER SYSTEM - Three independent update mechanisms: # TIMER SYSTEM - Three independent update mechanisms:
# ============================================================================= # =============================================================================
@ -160,6 +200,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Initialize helper modules # Initialize helper modules
self._listener_manager = TibberPricesListenerManager(hass, self._log_prefix) self._listener_manager = TibberPricesListenerManager(hass, self._log_prefix)
self._midnight_handler = TibberPricesMidnightHandler()
self._data_fetcher = TibberPricesDataFetcher( self._data_fetcher = TibberPricesDataFetcher(
api=self.api, api=self.api,
store=self._store, store=self._store,
@ -190,8 +231,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self._cached_transformed_data: dict[str, Any] | None = None self._cached_transformed_data: dict[str, Any] | None = None
self._last_transformation_config: dict[str, Any] | None = None self._last_transformation_config: dict[str, Any] | None = None
self._last_transformation_time: datetime | None = None # When data was last transformed (for cache) self._last_transformation_time: datetime | None = None # When data was last transformed (for cache)
self._last_midnight_turnover_check: datetime | None = None # Last midnight turnover detection check
self._last_actual_turnover: datetime | None = None # When midnight turnover actually happened
# Data lifecycle tracking for diagnostic sensor # Data lifecycle tracking for diagnostic sensor
self._lifecycle_state: str = ( self._lifecycle_state: str = (
@ -364,16 +403,13 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
True if midnight turnover is needed, False if already done True if midnight turnover is needed, False if already done
""" """
current_date = now.date() # Initialize handler on first use
if self._midnight_handler.last_check_time is None:
# First time check - initialize (no turnover needed) self._midnight_handler.update_check_time(now)
if self._last_midnight_turnover_check is None:
return False return False
last_check_date = self._last_midnight_turnover_check.date() # Delegate to midnight handler
return self._midnight_handler.is_turnover_needed(now)
# Turnover needed if we've crossed into a new day
return current_date > last_check_date
def _perform_midnight_data_rotation(self, now: datetime) -> None: def _perform_midnight_data_rotation(self, now: datetime) -> None:
""" """
@ -391,7 +427,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
""" """
current_date = now.date() current_date = now.date()
last_check_date = ( last_check_date = (
self._last_midnight_turnover_check.date() if self._last_midnight_turnover_check else current_date self._midnight_handler.last_check_time.date() if self._midnight_handler.last_check_time else current_date
) )
self._log( self._log(
@ -420,9 +456,14 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Main coordinator will have performed rotation already # Main coordinator will have performed rotation already
self.data["timestamp"] = now self.data["timestamp"] = now
# CRITICAL: Update _last_price_update to current time after turnover
# This prevents cache_validity from showing "date_mismatch" after midnight
# The data is still valid (just rotated today→yesterday, tomorrow→today)
# Update timestamp to reflect that the data is current for the new day
self._last_price_update = now
# Mark turnover as done for today (atomic update) # Mark turnover as done for today (atomic update)
self._last_midnight_turnover_check = now self._midnight_handler.mark_turnover_done(now)
self._last_actual_turnover = now # Record when actual turnover happened
@callback @callback
def _check_and_handle_midnight_turnover(self, now: datetime) -> bool: def _check_and_handle_midnight_turnover(self, now: datetime) -> bool:
@ -463,30 +504,58 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
return True return True
def register_lifecycle_callback(self, callback: Callable[[], None]) -> None: def register_lifecycle_callback(self, callback: Callable[[], None]) -> Callable[[], None]:
""" """
Register callback for lifecycle state changes (push updates). Register callback for lifecycle state changes (push updates).
This allows the lifecycle sensor to receive immediate updates when This allows sensors to receive immediate updates when the coordinator's
the coordinator's lifecycle state changes, instead of waiting for lifecycle state changes, instead of waiting for the next polling cycle.
the next polling cycle.
Args: Args:
callback: Function to call when lifecycle state changes (typically async_write_ha_state) callback: Function to call when lifecycle state changes (typically async_write_ha_state)
Returns:
Callable that unregisters the callback when called
""" """
if callback not in self._lifecycle_callbacks: if callback not in self._lifecycle_callbacks:
self._lifecycle_callbacks.append(callback) self._lifecycle_callbacks.append(callback)
def unregister() -> None:
"""Unregister the lifecycle callback."""
if callback in self._lifecycle_callbacks:
self._lifecycle_callbacks.remove(callback)
return unregister
def _notify_lifecycle_change(self) -> None: def _notify_lifecycle_change(self) -> None:
"""Notify registered callbacks about lifecycle state change (push update).""" """Notify registered callbacks about lifecycle state change (push update)."""
for lifecycle_callback in self._lifecycle_callbacks: for lifecycle_callback in self._lifecycle_callbacks:
lifecycle_callback() lifecycle_callback()
async def async_shutdown(self) -> None: async def async_shutdown(self) -> None:
"""Shut down the coordinator and clean up timers.""" """
Shut down the coordinator and clean up timers.
Cancels all three timer types:
- Timer #1: API polling (coordinator update timer)
- Timer #2: Quarter-hour entity updates
- Timer #3: Minute timing sensor updates
Also saves cache to persist any unsaved changes.
"""
# Cancel all timers first
self._listener_manager.cancel_timers() self._listener_manager.cancel_timers()
# Save cache to persist any unsaved data
# This ensures we don't lose data if HA is shutting down
try:
await self._store_cache()
self._log("debug", "Cache saved during shutdown")
except OSError as err:
# Log but don't raise - shutdown should complete even if cache save fails
self._log("error", "Failed to save cache during shutdown: %s", err)
def _has_existing_main_coordinator(self) -> bool: def _has_existing_main_coordinator(self) -> bool:
"""Check if there's already a main coordinator in hass.data.""" """Check if there's already a main coordinator in hass.data."""
domain_data = self.hass.data.get(DOMAIN, {}) domain_data = self.hass.data.get(DOMAIN, {})
@ -531,9 +600,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
if self._cached_price_data is None and self._cached_user_data is None: if self._cached_price_data is None and self._cached_user_data is None:
await self.load_cache() await self.load_cache()
# Initialize midnight turnover check on first run # Initialize midnight handler on first run
if self._last_midnight_turnover_check is None: if self._midnight_handler.last_check_time is None:
self._last_midnight_turnover_check = current_time self._midnight_handler.update_check_time(current_time)
# CRITICAL: Check for midnight turnover FIRST (before any data operations) # CRITICAL: Check for midnight turnover FIRST (before any data operations)
# This prevents race condition with Timer #2 (quarter-hour refresh) # This prevents race condition with Timer #2 (quarter-hour refresh)
@ -560,11 +629,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
try: try:
if self.is_main_entry(): if self.is_main_entry():
# Set lifecycle state to refreshing before API call
self._lifecycle_state = "refreshing"
self._is_fetching = True
self._notify_lifecycle_change() # Push update: now refreshing
# Reset API call counter if day changed # Reset API call counter if day changed
current_date = current_time.date() current_date = current_time.date()
if self._last_api_call_date != current_date: if self._last_api_call_date != current_date:
@ -573,18 +637,16 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Main entry fetches data for all homes # Main entry fetches data for all homes
configured_home_ids = self._get_configured_home_ids() configured_home_ids = self._get_configured_home_ids()
# Track last_price_update timestamp before fetch to detect if data actually changed
old_price_update = self._last_price_update
result = await self._data_fetcher.handle_main_entry_update( result = await self._data_fetcher.handle_main_entry_update(
current_time, current_time,
configured_home_ids, configured_home_ids,
self._transform_data_for_main_entry, self._transform_data_for_main_entry,
) )
# Update lifecycle tracking after successful fetch
self._is_fetching = False
self._api_calls_today += 1
self._lifecycle_state = "fresh" # Data just fetched
self._notify_lifecycle_change() # Push update: fresh data available
# CRITICAL: Sync cached data after API call # CRITICAL: Sync cached data after API call
# handle_main_entry_update() updates data_fetcher's cache, we need to sync: # handle_main_entry_update() updates data_fetcher's cache, we need to sync:
# 1. cached_user_data (for new integrations, may be fetched via update_user_data_if_needed()) # 1. cached_user_data (for new integrations, may be fetched via update_user_data_if_needed())
@ -593,6 +655,14 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self._cached_user_data = self._data_fetcher.cached_user_data self._cached_user_data = self._data_fetcher.cached_user_data
self._cached_price_data = self._data_fetcher.cached_price_data self._cached_price_data = self._data_fetcher.cached_price_data
self._last_price_update = self._data_fetcher._last_price_update # noqa: SLF001 - Sync for lifecycle tracking self._last_price_update = self._data_fetcher._last_price_update # noqa: SLF001 - Sync for lifecycle tracking
# Update lifecycle tracking only if we fetched NEW data (timestamp changed)
# This prevents recorder spam from state changes when returning cached data
if self._last_price_update != old_price_update:
self._api_calls_today += 1
self._lifecycle_state = "fresh" # Data just fetched
self._notify_lifecycle_change() # Push update: fresh data available
return result return result
# Subentries get data from main coordinator (no lifecycle tracking - they don't fetch) # Subentries get data from main coordinator (no lifecycle tracking - they don't fetch)
return await self._handle_subentry_update() return await self._handle_subentry_update()
@ -665,14 +735,17 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self._last_price_update = self._data_fetcher._last_price_update # noqa: SLF001 - Sync for lifecycle tracking self._last_price_update = self._data_fetcher._last_price_update # noqa: SLF001 - Sync for lifecycle tracking
self._last_user_update = self._data_fetcher._last_user_update # noqa: SLF001 - Sync for lifecycle tracking self._last_user_update = self._data_fetcher._last_user_update # noqa: SLF001 - Sync for lifecycle tracking
# Initialize _last_actual_turnover: If cache is from today, assume turnover happened at midnight # CRITICAL: Restore midnight handler state from cache
# If cache is from today, assume turnover already happened at midnight
# This allows proper turnover detection after HA restart
if self._last_price_update: if self._last_price_update:
cache_date = self.time.as_local(self._last_price_update).date() cache_date = self.time.as_local(self._last_price_update).date()
today_date = self.time.as_local(self.time.now()).date() today_date = self.time.as_local(self.time.now()).date()
if cache_date == today_date: if cache_date == today_date:
# Cache is from today, so midnight turnover already happened # Cache is from today, so midnight turnover already happened
today_midnight = self.time.as_local(self.time.now()).replace(hour=0, minute=0, second=0, microsecond=0) today_midnight = self.time.as_local(self.time.now()).replace(hour=0, minute=0, second=0, microsecond=0)
self._last_actual_turnover = today_midnight # Restore handler state: mark today's midnight as last turnover
self._midnight_handler.mark_turnover_done(today_midnight)
def _perform_midnight_turnover(self, price_info: dict[str, Any]) -> dict[str, Any]: def _perform_midnight_turnover(self, price_info: dict[str, Any]) -> dict[str, Any]:
""" """
@ -695,7 +768,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
async def _store_cache(self) -> None: async def _store_cache(self) -> None:
"""Store cache data.""" """Store cache data."""
await self._data_fetcher.store_cache(self._last_midnight_turnover_check) await self._data_fetcher.store_cache(self._midnight_handler.last_check_time)
def _needs_tomorrow_data(self, tomorrow_date: date) -> bool: def _needs_tomorrow_data(self, tomorrow_date: date) -> bool:
"""Check if tomorrow data is missing or invalid.""" """Check if tomorrow data is missing or invalid."""

View file

@ -83,8 +83,9 @@ def build_lifecycle_attributes(
api_calls = lifecycle_calculator.get_api_calls_today() api_calls = lifecycle_calculator.get_api_calls_today()
attributes["updates_today"] = api_calls attributes["updates_today"] = api_calls
if coordinator._last_actual_turnover: # noqa: SLF001 - Internal state access for diagnostic display # Last Turnover Time (from midnight handler)
attributes["last_turnover"] = coordinator._last_actual_turnover.isoformat() # noqa: SLF001 if coordinator._midnight_handler.last_turnover_time: # noqa: SLF001 - Internal state access for diagnostic display
attributes["last_turnover"] = coordinator._midnight_handler.last_turnover_time.isoformat() # noqa: SLF001
# Last Error (if any) # Last Error (if any)
if coordinator.last_exception: if coordinator.last_exception:

View file

@ -15,7 +15,7 @@ from .base import TibberPricesBaseCalculator
# Constants for lifecycle state determination # Constants for lifecycle state determination
FRESH_DATA_THRESHOLD_MINUTES = 5 # Data is "fresh" within 5 minutes of API fetch FRESH_DATA_THRESHOLD_MINUTES = 5 # Data is "fresh" within 5 minutes of API fetch
TOMORROW_CHECK_HOUR = 13 # After 13:00, we actively check for tomorrow data TOMORROW_CHECK_HOUR = 13 # After 13:00, we actively check for tomorrow data
TURNOVER_WARNING_SECONDS = 300 # Warn 5 minutes before midnight TURNOVER_WARNING_SECONDS = 900 # Warn 15 minutes before midnight (last quarter-hour: 23:45-00:00)
# Constants for 15-minute update boundaries (Timer #1) # Constants for 15-minute update boundaries (Timer #1)
QUARTER_HOUR_BOUNDARIES = [0, 15, 30, 45] # Minutes when Timer #1 can trigger QUARTER_HOUR_BOUNDARIES = [0, 15, 30, 45] # Minutes when Timer #1 can trigger
@ -34,29 +34,31 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
- "fresh": Just fetched from API (within 5 minutes) - "fresh": Just fetched from API (within 5 minutes)
- "refreshing": Currently fetching data from API - "refreshing": Currently fetching data from API
- "searching_tomorrow": After 13:00, actively looking for tomorrow data - "searching_tomorrow": After 13:00, actively looking for tomorrow data
- "turnover_pending": Midnight is approaching (within 5 minutes) - "turnover_pending": Last interval of day (23:45-00:00, midnight approaching)
- "error": Last API call failed - "error": Last API call failed
Priority order (highest to lowest):
1. refreshing - Active operation has highest priority
2. error - Errors must be immediately visible
3. turnover_pending - Important event at 23:45, should stay visible
4. searching_tomorrow - Stable during search phase (13:00-~15:00)
5. fresh - Informational only, lowest priority among active states
6. cached - Default fallback
""" """
coordinator = self.coordinator coordinator = self.coordinator
current_time = coordinator.time.now() current_time = coordinator.time.now()
# Check if actively fetching # Priority 1: Check if actively fetching (highest priority)
if coordinator._is_fetching: # noqa: SLF001 - Internal state access for lifecycle tracking if coordinator._is_fetching: # noqa: SLF001 - Internal state access for lifecycle tracking
return "refreshing" return "refreshing"
# Check if last update failed # Priority 2: Check if last update failed
# If coordinator has last_exception set, the last fetch failed # If coordinator has last_exception set, the last fetch failed
if coordinator.last_exception is not None: if coordinator.last_exception is not None:
return "error" return "error"
# Check if data is fresh (within 5 minutes of last API fetch) # Priority 3: Check if midnight turnover is pending (last quarter of day: 23:45-00:00)
if coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking
age = current_time - coordinator._last_price_update # noqa: SLF001
if age <= timedelta(minutes=FRESH_DATA_THRESHOLD_MINUTES):
return "fresh"
# Check if midnight turnover is pending (within 15 minutes)
midnight = coordinator.time.as_local(current_time).replace( midnight = coordinator.time.as_local(current_time).replace(
hour=0, minute=0, second=0, microsecond=0 hour=0, minute=0, second=0, microsecond=0
) + timedelta(days=1) ) + timedelta(days=1)
@ -64,7 +66,8 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
if 0 < time_to_midnight <= TURNOVER_WARNING_SECONDS: # Within 15 minutes of midnight (23:45-00:00) if 0 < time_to_midnight <= TURNOVER_WARNING_SECONDS: # Within 15 minutes of midnight (23:45-00:00)
return "turnover_pending" return "turnover_pending"
# Check if we're in tomorrow data search mode (after 13:00 and tomorrow missing) # Priority 4: Check if we're in tomorrow data search mode (after 13:00 and tomorrow missing)
# This should remain stable during the search phase, not flicker with "fresh" every 15 minutes
now_local = coordinator.time.as_local(current_time) now_local = coordinator.time.as_local(current_time)
if now_local.hour >= TOMORROW_CHECK_HOUR: if now_local.hour >= TOMORROW_CHECK_HOUR:
_, tomorrow_midnight = coordinator.time.get_day_boundaries("today") _, tomorrow_midnight = coordinator.time.get_day_boundaries("today")
@ -72,7 +75,14 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
if coordinator._needs_tomorrow_data(tomorrow_date): # noqa: SLF001 - Internal state access if coordinator._needs_tomorrow_data(tomorrow_date): # noqa: SLF001 - Internal state access
return "searching_tomorrow" return "searching_tomorrow"
# Default: using cached data # Priority 5: Check if data is fresh (within 5 minutes of last API fetch)
# Lower priority than searching_tomorrow to avoid state flickering during search phase
if coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking
age = current_time - coordinator._last_price_update # noqa: SLF001
if age <= timedelta(minutes=FRESH_DATA_THRESHOLD_MINUTES):
return "fresh"
# Priority 6: Default - using cached data
return "cached" return "cached"
def get_cache_age_minutes(self) -> int | None: def get_cache_age_minutes(self) -> int | None:
@ -109,8 +119,26 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
tomorrow_date = tomorrow_midnight.date() tomorrow_date = tomorrow_midnight.date()
tomorrow_missing = coordinator._needs_tomorrow_data(tomorrow_date) # noqa: SLF001 tomorrow_missing = coordinator._needs_tomorrow_data(tomorrow_date) # noqa: SLF001
# Case 1: Before 13:00 today - next poll is today at 13:00 (when tomorrow-search begins) # Case 1: Before 13:00 today - next poll is today at 13:xx:xx (when tomorrow-search begins)
if now_local.hour < TOMORROW_CHECK_HOUR: if now_local.hour < TOMORROW_CHECK_HOUR:
# Calculate exact time based on Timer #1 offset (minute and second precision)
if coordinator._last_coordinator_update is not None: # noqa: SLF001
last_update_local = coordinator.time.as_local(coordinator._last_coordinator_update) # noqa: SLF001
# Timer offset: minutes + seconds past the quarter-hour
minutes_past_quarter = last_update_local.minute % 15
seconds_offset = last_update_local.second
# Calculate first timer execution at or after 13:00 today
# Just apply timer offset to 13:00 (first quarter-hour mark >= 13:00)
# Timer runs at X:04:37 → Next poll at 13:04:37
return now_local.replace(
hour=TOMORROW_CHECK_HOUR,
minute=minutes_past_quarter,
second=seconds_offset,
microsecond=0,
)
# Fallback: No timer history yet
return now_local.replace(hour=TOMORROW_CHECK_HOUR, minute=0, second=0, microsecond=0) return now_local.replace(hour=TOMORROW_CHECK_HOUR, minute=0, second=0, microsecond=0)
# Case 2: After 13:00 today AND tomorrow data missing - actively polling now # Case 2: After 13:00 today AND tomorrow data missing - actively polling now
@ -258,13 +286,37 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
return "date_mismatch" return "date_mismatch"
# Check if cache is stale (older than expected) # Check if cache is stale (older than expected)
age = current_time - coordinator._last_price_update # noqa: SLF001 # CRITICAL: After midnight turnover, _last_price_update is set to 00:00
# Consider stale if older than 2 hours (8 * 15-minute intervals) # without new API data. The data is still valid (rotated yesterday→today).
if age > timedelta(hours=2): #
return "stale" # Cache is considered "valid" if EITHER:
# 1. Within normal update interval expectations (age ≤ 2 hours), OR
# 2. Coordinator update cycle ran recently (within last 30 minutes)
#
# Why check _last_coordinator_update?
# - After midnight turnover, _last_price_update stays at 00:00
# - But coordinator polls every 15 minutes and validates cache
# - If coordinator ran recently, cache was checked and deemed valid
# - This prevents false "stale" status when using rotated data
age = current_time - coordinator._last_price_update # noqa: SLF001
# If cache age is within normal expectations (≤2 hours), it's valid
if age <= timedelta(hours=2):
return "valid" return "valid"
# Cache is older than 2 hours - check if coordinator validated it recently
# If coordinator ran within last 30 minutes, cache is considered current
# (even if _last_price_update is older, e.g., from midnight turnover)
if coordinator._last_coordinator_update: # noqa: SLF001 - Internal state access
time_since_coordinator_check = current_time - coordinator._last_coordinator_update # noqa: SLF001
if time_since_coordinator_check <= timedelta(minutes=30):
# Coordinator validated cache recently - it's current
return "valid"
# Cache is old AND coordinator hasn't validated recently - stale
return "stale"
def get_api_calls_today(self) -> int: def get_api_calls_today(self) -> int:
"""Get the number of API calls made today.""" """Get the number of API calls made today."""
coordinator = self.coordinator coordinator = self.coordinator

180
tests/test_cache_age.py Normal file
View file

@ -0,0 +1,180 @@
"""
Unit tests for cache age calculation.
Tests the get_cache_age_minutes() method which calculates how old
the cached data is in minutes.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from unittest.mock import Mock
from zoneinfo import ZoneInfo
import pytest
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
TibberPricesLifecycleCalculator,
)
@pytest.mark.unit
def test_cache_age_no_update() -> None:
"""
Test cache age is None when no updates have occurred.
Scenario: Integration just started, no data fetched yet
Expected: Cache age is None
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator._last_price_update = None # noqa: SLF001 - No update yet!
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
assert age is None
@pytest.mark.unit
def test_cache_age_recent() -> None:
"""
Test cache age for recent data.
Scenario: Last update was 5 minutes ago
Expected: Cache age is 5 minutes
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(minutes=5)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
assert age == 5
@pytest.mark.unit
def test_cache_age_old() -> None:
"""
Test cache age for older data.
Scenario: Last update was 90 minutes ago (6 update cycles missed)
Expected: Cache age is 90 minutes
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(minutes=90)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
assert age == 90
@pytest.mark.unit
def test_cache_age_exact_minute() -> None:
"""
Test cache age calculation rounds down to minutes.
Scenario: Last update was 5 minutes and 45 seconds ago
Expected: Cache age is 5 minutes (int conversion truncates)
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(minutes=5, seconds=45)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
# int() truncates: 5.75 minutes → 5
assert age == 5
@pytest.mark.unit
def test_cache_age_zero_fresh_data() -> None:
"""
Test cache age is 0 for brand new data.
Scenario: Last update was just now (< 60 seconds ago)
Expected: Cache age is 0 minutes
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(seconds=30)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
assert age == 0
@pytest.mark.unit
def test_cache_age_multiple_hours() -> None:
"""
Test cache age for very old data (multiple hours).
Scenario: Last update was 3 hours ago (180 minutes)
Expected: Cache age is 180 minutes
This could happen if API was down or integration was stopped.
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(hours=3)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
assert age == 180
@pytest.mark.unit
def test_cache_age_boundary_60_seconds() -> None:
"""
Test cache age exactly at 60 seconds (1 minute boundary).
Scenario: Last update was exactly 60 seconds ago
Expected: Cache age is 1 minute
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(seconds=60)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
assert age == 1

View file

@ -0,0 +1,263 @@
"""
Unit tests for cache validity checks.
Tests the is_cache_valid() function which determines if cached price data
is still current or needs to be refreshed.
"""
from __future__ import annotations
from datetime import datetime
from unittest.mock import Mock
from zoneinfo import ZoneInfo
import pytest
from custom_components.tibber_prices.coordinator.cache import (
TibberPricesCacheData,
is_cache_valid,
)
@pytest.mark.unit
def test_cache_valid_same_day() -> None:
"""
Test cache is valid when data is from the same calendar day.
Scenario: Cache from 10:00, current time 15:00 (same day)
Expected: Cache is valid
"""
time_service = Mock()
cache_time = datetime(2025, 11, 22, 10, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"priceInfo": {"today": [1, 2, 3]}},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time,
last_user_update=cache_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is True
@pytest.mark.unit
def test_cache_invalid_different_day() -> None:
"""
Test cache is invalid when data is from a different calendar day.
Scenario: Cache from yesterday, current time today
Expected: Cache is invalid (date mismatch)
"""
time_service = Mock()
cache_time = datetime(2025, 11, 21, 23, 50, 0, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 0, 10, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"priceInfo": {"today": [1, 2, 3]}},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time,
last_user_update=cache_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False
@pytest.mark.unit
def test_cache_invalid_no_price_data() -> None:
"""
Test cache is invalid when no price data exists.
Scenario: Cache exists but price_data is None
Expected: Cache is invalid
"""
time_service = Mock()
current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data=None, # No price data!
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=current_time,
last_user_update=current_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False
@pytest.mark.unit
def test_cache_invalid_no_last_update() -> None:
"""
Test cache is invalid when last_price_update is None.
Scenario: Cache has data but no update timestamp
Expected: Cache is invalid
"""
time_service = Mock()
current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"priceInfo": {"today": [1, 2, 3]}},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=None, # No timestamp!
last_user_update=None,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False
@pytest.mark.unit
def test_cache_valid_after_midnight_turnover() -> None:
"""
Test cache validity after midnight turnover with updated timestamp.
Scenario: Midnight turnover occurred, _last_price_update was updated to new day
Expected: Cache is valid (same date as current)
This tests the fix for the "date_mismatch" bug where cache appeared invalid
after midnight despite successful data rotation.
"""
time_service = Mock()
# After midnight turnover, _last_price_update should be set to current time
turnover_time = datetime(2025, 11, 22, 0, 0, 5, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 0, 10, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"priceInfo": {"yesterday": [1], "today": [2], "tomorrow": []}},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=turnover_time, # Updated during turnover!
last_user_update=turnover_time,
last_midnight_check=turnover_time,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is True
@pytest.mark.unit
def test_cache_invalid_midnight_crossing_without_update() -> None:
"""
Test cache becomes invalid at midnight if timestamp not updated.
Scenario: HA restarted after midnight, cache still has yesterday's timestamp
Expected: Cache is invalid (would be caught and refreshed)
"""
time_service = Mock()
cache_time = datetime(2025, 11, 21, 23, 55, 0, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 0, 5, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"priceInfo": {"today": [1, 2, 3]}},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time, # Still yesterday!
last_user_update=cache_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False
@pytest.mark.unit
def test_cache_validity_timezone_aware() -> None:
"""
Test cache validity uses local timezone for date comparison.
Scenario: UTC midnight vs local timezone midnight (different dates)
Expected: Comparison done in local timezone, not UTC
This ensures that midnight turnover happens at local midnight,
not UTC midnight.
"""
time_service = Mock()
# 23:00 UTC on Nov 21 = 00:00 CET on Nov 22 (UTC+1)
cache_time_utc = datetime(2025, 11, 21, 23, 0, 0, tzinfo=ZoneInfo("UTC"))
current_time_utc = datetime(2025, 11, 21, 23, 30, 0, tzinfo=ZoneInfo("UTC"))
# Convert to local timezone (CET = UTC+1)
cache_time_local = cache_time_utc.astimezone(ZoneInfo("Europe/Oslo")) # 00:00 Nov 22
current_time_local = current_time_utc.astimezone(ZoneInfo("Europe/Oslo")) # 00:30 Nov 22
time_service.now.return_value = current_time_utc
time_service.as_local.return_value = current_time_local
cache_data = TibberPricesCacheData(
price_data={"priceInfo": {"today": [1, 2, 3]}},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time_utc,
last_user_update=cache_time_utc,
last_midnight_check=None,
)
# Mock as_local for cache_time
def as_local_side_effect(dt: datetime) -> datetime:
if dt == cache_time_utc:
return cache_time_local
return current_time_local
time_service.as_local.side_effect = as_local_side_effect
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
# Both times are Nov 22 in local timezone → same date → valid
assert result is True
@pytest.mark.unit
def test_cache_validity_exact_midnight_boundary() -> None:
"""
Test cache validity exactly at midnight boundary.
Scenario: Cache from 23:59:59, current time 00:00:00
Expected: Cache is invalid (different calendar days)
"""
time_service = Mock()
cache_time = datetime(2025, 11, 21, 23, 59, 59, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"priceInfo": {"today": [1, 2, 3]}},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time,
last_user_update=cache_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False

View file

@ -0,0 +1,284 @@
"""
Test cache validity status after midnight turnover.
This test verifies that cache_validity correctly reports "valid" after midnight
turnover, even when _last_price_update is 5+ hours old (set to 00:00 during turnover).
The data is still valid because it was rotated (tomorrowtoday), not stale.
"""
from __future__ import annotations
from datetime import datetime
from unittest.mock import Mock
import pytest
from custom_components.tibber_prices.coordinator.time_service import (
TibberPricesTimeService,
)
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
TibberPricesLifecycleCalculator,
)
@pytest.mark.unit
def test_cache_validity_after_midnight_no_api_calls_within_2h() -> None:
"""
Test cache validity after midnight turnover - within 2 hour window.
Scenario:
- Midnight turnover happened at 00:00 (set _last_price_update to 00:00)
- Current time: 01:30 (1.5 hours after turnover)
- Coordinator last ran at 01:15 (15 minutes ago)
- Cache age: 1.5 hours < 2 hours Should be "valid"
Expected: "valid" (not "stale")
Rationale: Data was rotated at midnight and is less than 2 hours old.
"""
# Create mock coordinator with midnight turnover state
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# Midnight turnover happened at 00:00
midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime for simplicity
# Current time: 01:30 (1.5 hours after turnover)
current_time = datetime(2025, 11, 22, 1, 30, 0) # noqa: DTZ001 - Test uses naive datetime
# Coordinator last checked at 01:15
coordinator_check_time = datetime(2025, 11, 22, 1, 15, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt # Assume UTC = local for simplicity
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "valid" - within 2-hour grace period after midnight
assert status == "valid"
@pytest.mark.unit
def test_cache_validity_after_midnight_no_api_calls_beyond_2h_coordinator_recent() -> None:
"""
Test cache validity after midnight turnover - beyond 2 hour window BUT coordinator ran recently.
Scenario:
- Midnight turnover happened at 00:00 (set _last_price_update to 00:00)
- Current time: 05:57 (5 hours 57 minutes after turnover)
- Coordinator last ran at 05:45 (12 minutes ago)
- Cache age: ~6 hours > 2 hours, BUT coordinator checked recently Should be "valid"
Expected: "valid" (NOT "stale")
Rationale: Even though _last_price_update is old, coordinator validated cache recently.
"""
# Create mock coordinator with midnight turnover state
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# Midnight turnover happened at 00:00
midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime
# Current time: 05:57 (almost 6 hours after turnover)
current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime
# Coordinator last checked at 05:45 (12 minutes ago)
coordinator_check_time = datetime(2025, 11, 22, 5, 45, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt # Assume UTC = local
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "valid" - coordinator validated cache recently
assert status == "valid"
@pytest.mark.unit
def test_cache_validity_after_midnight_beyond_2h_coordinator_old() -> None:
"""
Test cache validity when cache is old AND coordinator hasn't run recently.
Scenario:
- Midnight turnover happened at 00:00
- Current time: 05:57
- Coordinator last ran at 05:00 (57 minutes ago > 30 min threshold)
- Cache age: ~6 hours > 2 hours AND coordinator check old Should be "stale"
Expected: "stale"
Rationale: Cache is old and coordinator hasn't validated it recently.
"""
# Create mock coordinator
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# Midnight turnover happened at 00:00
midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime
# Current time: 05:57
current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime
# Coordinator last checked at 05:00 (57 minutes ago - beyond 30 min threshold)
coordinator_check_time = datetime(2025, 11, 22, 5, 0, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "stale" - cache old and coordinator check also old
assert status == "stale"
@pytest.mark.unit
def test_cache_validity_after_midnight_with_api_call() -> None:
"""
Test cache validity after midnight with API call made.
Scenario:
- API call made at 00:15 (updated _last_price_update to 00:15)
- Current time: 05:57 (5h 42m after last API call)
- Age: ~5h 42m > 2 hours, BUT coordinator ran at 05:45 Should be "valid"
Expected: "valid" (NOT "stale")
Rationale: Coordinator validated cache recently (within 30 min).
"""
# Create mock coordinator
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# API call happened at 00:15 (15 minutes after midnight)
last_api_call = datetime(2025, 11, 22, 0, 15, 0) # noqa: DTZ001 - Test uses naive datetime
# Current time: 05:57
current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime
# Coordinator last checked at 05:45
coordinator_check_time = datetime(2025, 11, 22, 5, 45, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = last_api_call # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "valid" - coordinator validated recently
assert status == "valid"
@pytest.mark.unit
def test_cache_validity_date_mismatch() -> None:
"""
Test cache validity when cache is from yesterday.
Scenario:
- Cache is from Nov 21 (yesterday)
- Current time: Nov 22, 05:57 (today)
- Should report "date_mismatch"
Expected: "date_mismatch"
Rationale: Cache is from a different day, turnover didn't happen yet.
"""
# Create mock coordinator
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# Cache from yesterday
yesterday = datetime(2025, 11, 21, 22, 0, 0) # noqa: DTZ001 - Test uses naive datetime
# Current time: today 05:57
current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = yesterday # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = None # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "date_mismatch" - cache is from different day
assert status == "date_mismatch"
@pytest.mark.unit
def test_cache_validity_empty_no_data() -> None:
"""
Test cache validity when no data exists.
Expected: "empty"
"""
mock_coordinator = Mock(spec=["data", "_last_price_update", "_api_calls_today", "time"])
mock_coordinator.data = None # No data
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
status = calculator.get_cache_validity_status()
assert status == "empty"
@pytest.mark.unit
def test_cache_validity_empty_no_timestamp() -> None:
"""
Test cache validity when data exists but no timestamp.
Expected: "empty"
"""
mock_coordinator = Mock(spec=["data", "_last_price_update", "_api_calls_today", "time"])
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = None # noqa: SLF001 - Test accesses internal state
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
status = calculator.get_cache_validity_status()
assert status == "empty"