diff --git a/custom_components/tibber_prices/coordinator/core.py b/custom_components/tibber_prices/coordinator/core.py index b462003..2aeaa54 100644 --- a/custom_components/tibber_prices/coordinator/core.py +++ b/custom_components/tibber_prices/coordinator/core.py @@ -31,6 +31,7 @@ from custom_components.tibber_prices.const import DOMAIN from custom_components.tibber_prices.utils.price import ( find_price_data_for_interval, ) +from homeassistant.exceptions import ConfigEntryAuthFailed from . import helpers from .constants import ( @@ -40,6 +41,7 @@ from .constants import ( from .data_fetching import TibberPricesDataFetcher from .data_transformation import TibberPricesDataTransformer from .listeners import TibberPricesListenerManager +from .midnight_handler import TibberPricesMidnightHandler from .periods import TibberPricesPeriodCalculator from .time_service import TibberPricesTimeService @@ -48,6 +50,44 @@ _LOGGER = logging.getLogger(__name__) # Lifecycle state transition thresholds FRESH_TO_CACHED_SECONDS = 300 # 5 minutes + +def get_connection_state(coordinator: TibberPricesDataUpdateCoordinator) -> bool | None: + """ + Determine API connection state based on lifecycle and exceptions. + + This is the source of truth for the connection binary sensor. + It ensures consistency between lifecycle_status and connection state. + + Returns: + True: Connected and working (cached or fresh data) + False: Connection failed or auth failed + None: Unknown state (no data yet, initializing) + + Logic: + - Auth failures → definitively disconnected (False) + - Other errors with cached data → considered connected (True, using cache) + - No errors with data → connected (True) + - No data and no error → initializing (None) + + """ + # Auth failures = definitively disconnected + # User must provide new token via reauth flow + if isinstance(coordinator.last_exception, ConfigEntryAuthFailed): + return False + + # Other errors but cache available = considered connected (using cached data as fallback) + # This shows "on" but lifecycle_status will show "error" to indicate degraded operation + if coordinator.last_exception and coordinator.data: + return True + + # No error and data available = connected + if coordinator.data: + return True + + # No data and no error = initializing (unknown state) + return None + + # ============================================================================= # TIMER SYSTEM - Three independent update mechanisms: # ============================================================================= @@ -160,6 +200,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): # Initialize helper modules self._listener_manager = TibberPricesListenerManager(hass, self._log_prefix) + self._midnight_handler = TibberPricesMidnightHandler() self._data_fetcher = TibberPricesDataFetcher( api=self.api, store=self._store, @@ -190,8 +231,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): self._cached_transformed_data: dict[str, Any] | None = None self._last_transformation_config: dict[str, Any] | None = None self._last_transformation_time: datetime | None = None # When data was last transformed (for cache) - self._last_midnight_turnover_check: datetime | None = None # Last midnight turnover detection check - self._last_actual_turnover: datetime | None = None # When midnight turnover actually happened # Data lifecycle tracking for diagnostic sensor self._lifecycle_state: str = ( @@ -364,16 +403,13 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): True if midnight turnover is needed, False if already done """ - current_date = now.date() - - # First time check - initialize (no turnover needed) - if self._last_midnight_turnover_check is None: + # Initialize handler on first use + if self._midnight_handler.last_check_time is None: + self._midnight_handler.update_check_time(now) return False - last_check_date = self._last_midnight_turnover_check.date() - - # Turnover needed if we've crossed into a new day - return current_date > last_check_date + # Delegate to midnight handler + return self._midnight_handler.is_turnover_needed(now) def _perform_midnight_data_rotation(self, now: datetime) -> None: """ @@ -391,7 +427,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): """ current_date = now.date() last_check_date = ( - self._last_midnight_turnover_check.date() if self._last_midnight_turnover_check else current_date + self._midnight_handler.last_check_time.date() if self._midnight_handler.last_check_time else current_date ) self._log( @@ -420,9 +456,14 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): # Main coordinator will have performed rotation already self.data["timestamp"] = now + # CRITICAL: Update _last_price_update to current time after turnover + # This prevents cache_validity from showing "date_mismatch" after midnight + # The data is still valid (just rotated today→yesterday, tomorrow→today) + # Update timestamp to reflect that the data is current for the new day + self._last_price_update = now + # Mark turnover as done for today (atomic update) - self._last_midnight_turnover_check = now - self._last_actual_turnover = now # Record when actual turnover happened + self._midnight_handler.mark_turnover_done(now) @callback def _check_and_handle_midnight_turnover(self, now: datetime) -> bool: @@ -463,30 +504,58 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): return True - def register_lifecycle_callback(self, callback: Callable[[], None]) -> None: + def register_lifecycle_callback(self, callback: Callable[[], None]) -> Callable[[], None]: """ Register callback for lifecycle state changes (push updates). - This allows the lifecycle sensor to receive immediate updates when - the coordinator's lifecycle state changes, instead of waiting for - the next polling cycle. + This allows sensors to receive immediate updates when the coordinator's + lifecycle state changes, instead of waiting for the next polling cycle. Args: callback: Function to call when lifecycle state changes (typically async_write_ha_state) + Returns: + Callable that unregisters the callback when called + """ if callback not in self._lifecycle_callbacks: self._lifecycle_callbacks.append(callback) + def unregister() -> None: + """Unregister the lifecycle callback.""" + if callback in self._lifecycle_callbacks: + self._lifecycle_callbacks.remove(callback) + + return unregister + def _notify_lifecycle_change(self) -> None: """Notify registered callbacks about lifecycle state change (push update).""" for lifecycle_callback in self._lifecycle_callbacks: lifecycle_callback() async def async_shutdown(self) -> None: - """Shut down the coordinator and clean up timers.""" + """ + Shut down the coordinator and clean up timers. + + Cancels all three timer types: + - Timer #1: API polling (coordinator update timer) + - Timer #2: Quarter-hour entity updates + - Timer #3: Minute timing sensor updates + + Also saves cache to persist any unsaved changes. + """ + # Cancel all timers first self._listener_manager.cancel_timers() + # Save cache to persist any unsaved data + # This ensures we don't lose data if HA is shutting down + try: + await self._store_cache() + self._log("debug", "Cache saved during shutdown") + except OSError as err: + # Log but don't raise - shutdown should complete even if cache save fails + self._log("error", "Failed to save cache during shutdown: %s", err) + def _has_existing_main_coordinator(self) -> bool: """Check if there's already a main coordinator in hass.data.""" domain_data = self.hass.data.get(DOMAIN, {}) @@ -531,9 +600,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): if self._cached_price_data is None and self._cached_user_data is None: await self.load_cache() - # Initialize midnight turnover check on first run - if self._last_midnight_turnover_check is None: - self._last_midnight_turnover_check = current_time + # Initialize midnight handler on first run + if self._midnight_handler.last_check_time is None: + self._midnight_handler.update_check_time(current_time) # CRITICAL: Check for midnight turnover FIRST (before any data operations) # This prevents race condition with Timer #2 (quarter-hour refresh) @@ -560,11 +629,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): try: if self.is_main_entry(): - # Set lifecycle state to refreshing before API call - self._lifecycle_state = "refreshing" - self._is_fetching = True - self._notify_lifecycle_change() # Push update: now refreshing - # Reset API call counter if day changed current_date = current_time.date() if self._last_api_call_date != current_date: @@ -573,18 +637,16 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): # Main entry fetches data for all homes configured_home_ids = self._get_configured_home_ids() + + # Track last_price_update timestamp before fetch to detect if data actually changed + old_price_update = self._last_price_update + result = await self._data_fetcher.handle_main_entry_update( current_time, configured_home_ids, self._transform_data_for_main_entry, ) - # Update lifecycle tracking after successful fetch - self._is_fetching = False - self._api_calls_today += 1 - self._lifecycle_state = "fresh" # Data just fetched - self._notify_lifecycle_change() # Push update: fresh data available - # CRITICAL: Sync cached data after API call # handle_main_entry_update() updates data_fetcher's cache, we need to sync: # 1. cached_user_data (for new integrations, may be fetched via update_user_data_if_needed()) @@ -593,6 +655,14 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): self._cached_user_data = self._data_fetcher.cached_user_data self._cached_price_data = self._data_fetcher.cached_price_data self._last_price_update = self._data_fetcher._last_price_update # noqa: SLF001 - Sync for lifecycle tracking + + # Update lifecycle tracking only if we fetched NEW data (timestamp changed) + # This prevents recorder spam from state changes when returning cached data + if self._last_price_update != old_price_update: + self._api_calls_today += 1 + self._lifecycle_state = "fresh" # Data just fetched + self._notify_lifecycle_change() # Push update: fresh data available + return result # Subentries get data from main coordinator (no lifecycle tracking - they don't fetch) return await self._handle_subentry_update() @@ -665,14 +735,17 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): self._last_price_update = self._data_fetcher._last_price_update # noqa: SLF001 - Sync for lifecycle tracking self._last_user_update = self._data_fetcher._last_user_update # noqa: SLF001 - Sync for lifecycle tracking - # Initialize _last_actual_turnover: If cache is from today, assume turnover happened at midnight + # CRITICAL: Restore midnight handler state from cache + # If cache is from today, assume turnover already happened at midnight + # This allows proper turnover detection after HA restart if self._last_price_update: cache_date = self.time.as_local(self._last_price_update).date() today_date = self.time.as_local(self.time.now()).date() if cache_date == today_date: # Cache is from today, so midnight turnover already happened today_midnight = self.time.as_local(self.time.now()).replace(hour=0, minute=0, second=0, microsecond=0) - self._last_actual_turnover = today_midnight + # Restore handler state: mark today's midnight as last turnover + self._midnight_handler.mark_turnover_done(today_midnight) def _perform_midnight_turnover(self, price_info: dict[str, Any]) -> dict[str, Any]: """ @@ -695,7 +768,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): async def _store_cache(self) -> None: """Store cache data.""" - await self._data_fetcher.store_cache(self._last_midnight_turnover_check) + await self._data_fetcher.store_cache(self._midnight_handler.last_check_time) def _needs_tomorrow_data(self, tomorrow_date: date) -> bool: """Check if tomorrow data is missing or invalid.""" diff --git a/custom_components/tibber_prices/sensor/attributes/lifecycle.py b/custom_components/tibber_prices/sensor/attributes/lifecycle.py index d27ee3a..5a6b947 100644 --- a/custom_components/tibber_prices/sensor/attributes/lifecycle.py +++ b/custom_components/tibber_prices/sensor/attributes/lifecycle.py @@ -83,8 +83,9 @@ def build_lifecycle_attributes( api_calls = lifecycle_calculator.get_api_calls_today() attributes["updates_today"] = api_calls - if coordinator._last_actual_turnover: # noqa: SLF001 - Internal state access for diagnostic display - attributes["last_turnover"] = coordinator._last_actual_turnover.isoformat() # noqa: SLF001 + # Last Turnover Time (from midnight handler) + if coordinator._midnight_handler.last_turnover_time: # noqa: SLF001 - Internal state access for diagnostic display + attributes["last_turnover"] = coordinator._midnight_handler.last_turnover_time.isoformat() # noqa: SLF001 # Last Error (if any) if coordinator.last_exception: diff --git a/custom_components/tibber_prices/sensor/calculators/lifecycle.py b/custom_components/tibber_prices/sensor/calculators/lifecycle.py index 64ea302..5da1775 100644 --- a/custom_components/tibber_prices/sensor/calculators/lifecycle.py +++ b/custom_components/tibber_prices/sensor/calculators/lifecycle.py @@ -15,7 +15,7 @@ from .base import TibberPricesBaseCalculator # Constants for lifecycle state determination FRESH_DATA_THRESHOLD_MINUTES = 5 # Data is "fresh" within 5 minutes of API fetch TOMORROW_CHECK_HOUR = 13 # After 13:00, we actively check for tomorrow data -TURNOVER_WARNING_SECONDS = 300 # Warn 5 minutes before midnight +TURNOVER_WARNING_SECONDS = 900 # Warn 15 minutes before midnight (last quarter-hour: 23:45-00:00) # Constants for 15-minute update boundaries (Timer #1) QUARTER_HOUR_BOUNDARIES = [0, 15, 30, 45] # Minutes when Timer #1 can trigger @@ -34,29 +34,31 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): - "fresh": Just fetched from API (within 5 minutes) - "refreshing": Currently fetching data from API - "searching_tomorrow": After 13:00, actively looking for tomorrow data - - "turnover_pending": Midnight is approaching (within 5 minutes) + - "turnover_pending": Last interval of day (23:45-00:00, midnight approaching) - "error": Last API call failed + Priority order (highest to lowest): + 1. refreshing - Active operation has highest priority + 2. error - Errors must be immediately visible + 3. turnover_pending - Important event at 23:45, should stay visible + 4. searching_tomorrow - Stable during search phase (13:00-~15:00) + 5. fresh - Informational only, lowest priority among active states + 6. cached - Default fallback + """ coordinator = self.coordinator current_time = coordinator.time.now() - # Check if actively fetching + # Priority 1: Check if actively fetching (highest priority) if coordinator._is_fetching: # noqa: SLF001 - Internal state access for lifecycle tracking return "refreshing" - # Check if last update failed + # Priority 2: Check if last update failed # If coordinator has last_exception set, the last fetch failed if coordinator.last_exception is not None: return "error" - # Check if data is fresh (within 5 minutes of last API fetch) - if coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking - age = current_time - coordinator._last_price_update # noqa: SLF001 - if age <= timedelta(minutes=FRESH_DATA_THRESHOLD_MINUTES): - return "fresh" - - # Check if midnight turnover is pending (within 15 minutes) + # Priority 3: Check if midnight turnover is pending (last quarter of day: 23:45-00:00) midnight = coordinator.time.as_local(current_time).replace( hour=0, minute=0, second=0, microsecond=0 ) + timedelta(days=1) @@ -64,7 +66,8 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): if 0 < time_to_midnight <= TURNOVER_WARNING_SECONDS: # Within 15 minutes of midnight (23:45-00:00) return "turnover_pending" - # Check if we're in tomorrow data search mode (after 13:00 and tomorrow missing) + # Priority 4: Check if we're in tomorrow data search mode (after 13:00 and tomorrow missing) + # This should remain stable during the search phase, not flicker with "fresh" every 15 minutes now_local = coordinator.time.as_local(current_time) if now_local.hour >= TOMORROW_CHECK_HOUR: _, tomorrow_midnight = coordinator.time.get_day_boundaries("today") @@ -72,7 +75,14 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): if coordinator._needs_tomorrow_data(tomorrow_date): # noqa: SLF001 - Internal state access return "searching_tomorrow" - # Default: using cached data + # Priority 5: Check if data is fresh (within 5 minutes of last API fetch) + # Lower priority than searching_tomorrow to avoid state flickering during search phase + if coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking + age = current_time - coordinator._last_price_update # noqa: SLF001 + if age <= timedelta(minutes=FRESH_DATA_THRESHOLD_MINUTES): + return "fresh" + + # Priority 6: Default - using cached data return "cached" def get_cache_age_minutes(self) -> int | None: @@ -109,8 +119,26 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): tomorrow_date = tomorrow_midnight.date() tomorrow_missing = coordinator._needs_tomorrow_data(tomorrow_date) # noqa: SLF001 - # Case 1: Before 13:00 today - next poll is today at 13:00 (when tomorrow-search begins) + # Case 1: Before 13:00 today - next poll is today at 13:xx:xx (when tomorrow-search begins) if now_local.hour < TOMORROW_CHECK_HOUR: + # Calculate exact time based on Timer #1 offset (minute and second precision) + if coordinator._last_coordinator_update is not None: # noqa: SLF001 + last_update_local = coordinator.time.as_local(coordinator._last_coordinator_update) # noqa: SLF001 + # Timer offset: minutes + seconds past the quarter-hour + minutes_past_quarter = last_update_local.minute % 15 + seconds_offset = last_update_local.second + + # Calculate first timer execution at or after 13:00 today + # Just apply timer offset to 13:00 (first quarter-hour mark >= 13:00) + # Timer runs at X:04:37 → Next poll at 13:04:37 + return now_local.replace( + hour=TOMORROW_CHECK_HOUR, + minute=minutes_past_quarter, + second=seconds_offset, + microsecond=0, + ) + + # Fallback: No timer history yet return now_local.replace(hour=TOMORROW_CHECK_HOUR, minute=0, second=0, microsecond=0) # Case 2: After 13:00 today AND tomorrow data missing - actively polling now @@ -258,12 +286,36 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): return "date_mismatch" # Check if cache is stale (older than expected) - age = current_time - coordinator._last_price_update # noqa: SLF001 - # Consider stale if older than 2 hours (8 * 15-minute intervals) - if age > timedelta(hours=2): - return "stale" + # CRITICAL: After midnight turnover, _last_price_update is set to 00:00 + # without new API data. The data is still valid (rotated yesterday→today). + # + # Cache is considered "valid" if EITHER: + # 1. Within normal update interval expectations (age ≤ 2 hours), OR + # 2. Coordinator update cycle ran recently (within last 30 minutes) + # + # Why check _last_coordinator_update? + # - After midnight turnover, _last_price_update stays at 00:00 + # - But coordinator polls every 15 minutes and validates cache + # - If coordinator ran recently, cache was checked and deemed valid + # - This prevents false "stale" status when using rotated data - return "valid" + age = current_time - coordinator._last_price_update # noqa: SLF001 + + # If cache age is within normal expectations (≤2 hours), it's valid + if age <= timedelta(hours=2): + return "valid" + + # Cache is older than 2 hours - check if coordinator validated it recently + # If coordinator ran within last 30 minutes, cache is considered current + # (even if _last_price_update is older, e.g., from midnight turnover) + if coordinator._last_coordinator_update: # noqa: SLF001 - Internal state access + time_since_coordinator_check = current_time - coordinator._last_coordinator_update # noqa: SLF001 + if time_since_coordinator_check <= timedelta(minutes=30): + # Coordinator validated cache recently - it's current + return "valid" + + # Cache is old AND coordinator hasn't validated recently - stale + return "stale" def get_api_calls_today(self) -> int: """Get the number of API calls made today.""" diff --git a/tests/test_cache_age.py b/tests/test_cache_age.py new file mode 100644 index 0000000..956f73d --- /dev/null +++ b/tests/test_cache_age.py @@ -0,0 +1,180 @@ +""" +Unit tests for cache age calculation. + +Tests the get_cache_age_minutes() method which calculates how old +the cached data is in minutes. +""" + +from __future__ import annotations + +from datetime import datetime, timedelta +from unittest.mock import Mock +from zoneinfo import ZoneInfo + +import pytest + +from custom_components.tibber_prices.sensor.calculators.lifecycle import ( + TibberPricesLifecycleCalculator, +) + + +@pytest.mark.unit +def test_cache_age_no_update() -> None: + """ + Test cache age is None when no updates have occurred. + + Scenario: Integration just started, no data fetched yet + Expected: Cache age is None + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.now.return_value = current_time + coordinator._last_price_update = None # noqa: SLF001 - No update yet! + + calculator = TibberPricesLifecycleCalculator(coordinator) + age = calculator.get_cache_age_minutes() + + assert age is None + + +@pytest.mark.unit +def test_cache_age_recent() -> None: + """ + Test cache age for recent data. + + Scenario: Last update was 5 minutes ago + Expected: Cache age is 5 minutes + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + last_update = current_time - timedelta(minutes=5) + + coordinator.time.now.return_value = current_time + coordinator._last_price_update = last_update # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + age = calculator.get_cache_age_minutes() + + assert age == 5 + + +@pytest.mark.unit +def test_cache_age_old() -> None: + """ + Test cache age for older data. + + Scenario: Last update was 90 minutes ago (6 update cycles missed) + Expected: Cache age is 90 minutes + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + last_update = current_time - timedelta(minutes=90) + + coordinator.time.now.return_value = current_time + coordinator._last_price_update = last_update # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + age = calculator.get_cache_age_minutes() + + assert age == 90 + + +@pytest.mark.unit +def test_cache_age_exact_minute() -> None: + """ + Test cache age calculation rounds down to minutes. + + Scenario: Last update was 5 minutes and 45 seconds ago + Expected: Cache age is 5 minutes (int conversion truncates) + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + last_update = current_time - timedelta(minutes=5, seconds=45) + + coordinator.time.now.return_value = current_time + coordinator._last_price_update = last_update # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + age = calculator.get_cache_age_minutes() + + # int() truncates: 5.75 minutes → 5 + assert age == 5 + + +@pytest.mark.unit +def test_cache_age_zero_fresh_data() -> None: + """ + Test cache age is 0 for brand new data. + + Scenario: Last update was just now (< 60 seconds ago) + Expected: Cache age is 0 minutes + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + last_update = current_time - timedelta(seconds=30) + + coordinator.time.now.return_value = current_time + coordinator._last_price_update = last_update # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + age = calculator.get_cache_age_minutes() + + assert age == 0 + + +@pytest.mark.unit +def test_cache_age_multiple_hours() -> None: + """ + Test cache age for very old data (multiple hours). + + Scenario: Last update was 3 hours ago (180 minutes) + Expected: Cache age is 180 minutes + + This could happen if API was down or integration was stopped. + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + last_update = current_time - timedelta(hours=3) + + coordinator.time.now.return_value = current_time + coordinator._last_price_update = last_update # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + age = calculator.get_cache_age_minutes() + + assert age == 180 + + +@pytest.mark.unit +def test_cache_age_boundary_60_seconds() -> None: + """ + Test cache age exactly at 60 seconds (1 minute boundary). + + Scenario: Last update was exactly 60 seconds ago + Expected: Cache age is 1 minute + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + last_update = current_time - timedelta(seconds=60) + + coordinator.time.now.return_value = current_time + coordinator._last_price_update = last_update # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + age = calculator.get_cache_age_minutes() + + assert age == 1 diff --git a/tests/test_cache_validity.py b/tests/test_cache_validity.py new file mode 100644 index 0000000..ef1b4e5 --- /dev/null +++ b/tests/test_cache_validity.py @@ -0,0 +1,263 @@ +""" +Unit tests for cache validity checks. + +Tests the is_cache_valid() function which determines if cached price data +is still current or needs to be refreshed. +""" + +from __future__ import annotations + +from datetime import datetime +from unittest.mock import Mock +from zoneinfo import ZoneInfo + +import pytest + +from custom_components.tibber_prices.coordinator.cache import ( + TibberPricesCacheData, + is_cache_valid, +) + + +@pytest.mark.unit +def test_cache_valid_same_day() -> None: + """ + Test cache is valid when data is from the same calendar day. + + Scenario: Cache from 10:00, current time 15:00 (same day) + Expected: Cache is valid + """ + time_service = Mock() + cache_time = datetime(2025, 11, 22, 10, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + + time_service.now.return_value = current_time + time_service.as_local.side_effect = lambda dt: dt + + cache_data = TibberPricesCacheData( + price_data={"priceInfo": {"today": [1, 2, 3]}}, + user_data={"viewer": {"home": {"id": "test"}}}, + last_price_update=cache_time, + last_user_update=cache_time, + last_midnight_check=None, + ) + + result = is_cache_valid(cache_data, "[TEST]", time=time_service) + + assert result is True + + +@pytest.mark.unit +def test_cache_invalid_different_day() -> None: + """ + Test cache is invalid when data is from a different calendar day. + + Scenario: Cache from yesterday, current time today + Expected: Cache is invalid (date mismatch) + """ + time_service = Mock() + cache_time = datetime(2025, 11, 21, 23, 50, 0, tzinfo=ZoneInfo("Europe/Oslo")) + current_time = datetime(2025, 11, 22, 0, 10, 0, tzinfo=ZoneInfo("Europe/Oslo")) + + time_service.now.return_value = current_time + time_service.as_local.side_effect = lambda dt: dt + + cache_data = TibberPricesCacheData( + price_data={"priceInfo": {"today": [1, 2, 3]}}, + user_data={"viewer": {"home": {"id": "test"}}}, + last_price_update=cache_time, + last_user_update=cache_time, + last_midnight_check=None, + ) + + result = is_cache_valid(cache_data, "[TEST]", time=time_service) + + assert result is False + + +@pytest.mark.unit +def test_cache_invalid_no_price_data() -> None: + """ + Test cache is invalid when no price data exists. + + Scenario: Cache exists but price_data is None + Expected: Cache is invalid + """ + time_service = Mock() + current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + + time_service.now.return_value = current_time + time_service.as_local.side_effect = lambda dt: dt + + cache_data = TibberPricesCacheData( + price_data=None, # No price data! + user_data={"viewer": {"home": {"id": "test"}}}, + last_price_update=current_time, + last_user_update=current_time, + last_midnight_check=None, + ) + + result = is_cache_valid(cache_data, "[TEST]", time=time_service) + + assert result is False + + +@pytest.mark.unit +def test_cache_invalid_no_last_update() -> None: + """ + Test cache is invalid when last_price_update is None. + + Scenario: Cache has data but no update timestamp + Expected: Cache is invalid + """ + time_service = Mock() + current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + + time_service.now.return_value = current_time + time_service.as_local.side_effect = lambda dt: dt + + cache_data = TibberPricesCacheData( + price_data={"priceInfo": {"today": [1, 2, 3]}}, + user_data={"viewer": {"home": {"id": "test"}}}, + last_price_update=None, # No timestamp! + last_user_update=None, + last_midnight_check=None, + ) + + result = is_cache_valid(cache_data, "[TEST]", time=time_service) + + assert result is False + + +@pytest.mark.unit +def test_cache_valid_after_midnight_turnover() -> None: + """ + Test cache validity after midnight turnover with updated timestamp. + + Scenario: Midnight turnover occurred, _last_price_update was updated to new day + Expected: Cache is valid (same date as current) + + This tests the fix for the "date_mismatch" bug where cache appeared invalid + after midnight despite successful data rotation. + """ + time_service = Mock() + # After midnight turnover, _last_price_update should be set to current time + turnover_time = datetime(2025, 11, 22, 0, 0, 5, tzinfo=ZoneInfo("Europe/Oslo")) + current_time = datetime(2025, 11, 22, 0, 10, 0, tzinfo=ZoneInfo("Europe/Oslo")) + + time_service.now.return_value = current_time + time_service.as_local.side_effect = lambda dt: dt + + cache_data = TibberPricesCacheData( + price_data={"priceInfo": {"yesterday": [1], "today": [2], "tomorrow": []}}, + user_data={"viewer": {"home": {"id": "test"}}}, + last_price_update=turnover_time, # Updated during turnover! + last_user_update=turnover_time, + last_midnight_check=turnover_time, + ) + + result = is_cache_valid(cache_data, "[TEST]", time=time_service) + + assert result is True + + +@pytest.mark.unit +def test_cache_invalid_midnight_crossing_without_update() -> None: + """ + Test cache becomes invalid at midnight if timestamp not updated. + + Scenario: HA restarted after midnight, cache still has yesterday's timestamp + Expected: Cache is invalid (would be caught and refreshed) + """ + time_service = Mock() + cache_time = datetime(2025, 11, 21, 23, 55, 0, tzinfo=ZoneInfo("Europe/Oslo")) + current_time = datetime(2025, 11, 22, 0, 5, 0, tzinfo=ZoneInfo("Europe/Oslo")) + + time_service.now.return_value = current_time + time_service.as_local.side_effect = lambda dt: dt + + cache_data = TibberPricesCacheData( + price_data={"priceInfo": {"today": [1, 2, 3]}}, + user_data={"viewer": {"home": {"id": "test"}}}, + last_price_update=cache_time, # Still yesterday! + last_user_update=cache_time, + last_midnight_check=None, + ) + + result = is_cache_valid(cache_data, "[TEST]", time=time_service) + + assert result is False + + +@pytest.mark.unit +def test_cache_validity_timezone_aware() -> None: + """ + Test cache validity uses local timezone for date comparison. + + Scenario: UTC midnight vs local timezone midnight (different dates) + Expected: Comparison done in local timezone, not UTC + + This ensures that midnight turnover happens at local midnight, + not UTC midnight. + """ + time_service = Mock() + + # 23:00 UTC on Nov 21 = 00:00 CET on Nov 22 (UTC+1) + cache_time_utc = datetime(2025, 11, 21, 23, 0, 0, tzinfo=ZoneInfo("UTC")) + current_time_utc = datetime(2025, 11, 21, 23, 30, 0, tzinfo=ZoneInfo("UTC")) + + # Convert to local timezone (CET = UTC+1) + cache_time_local = cache_time_utc.astimezone(ZoneInfo("Europe/Oslo")) # 00:00 Nov 22 + current_time_local = current_time_utc.astimezone(ZoneInfo("Europe/Oslo")) # 00:30 Nov 22 + + time_service.now.return_value = current_time_utc + time_service.as_local.return_value = current_time_local + + cache_data = TibberPricesCacheData( + price_data={"priceInfo": {"today": [1, 2, 3]}}, + user_data={"viewer": {"home": {"id": "test"}}}, + last_price_update=cache_time_utc, + last_user_update=cache_time_utc, + last_midnight_check=None, + ) + + # Mock as_local for cache_time + def as_local_side_effect(dt: datetime) -> datetime: + if dt == cache_time_utc: + return cache_time_local + return current_time_local + + time_service.as_local.side_effect = as_local_side_effect + + result = is_cache_valid(cache_data, "[TEST]", time=time_service) + + # Both times are Nov 22 in local timezone → same date → valid + assert result is True + + +@pytest.mark.unit +def test_cache_validity_exact_midnight_boundary() -> None: + """ + Test cache validity exactly at midnight boundary. + + Scenario: Cache from 23:59:59, current time 00:00:00 + Expected: Cache is invalid (different calendar days) + """ + time_service = Mock() + cache_time = datetime(2025, 11, 21, 23, 59, 59, tzinfo=ZoneInfo("Europe/Oslo")) + current_time = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + + time_service.now.return_value = current_time + time_service.as_local.side_effect = lambda dt: dt + + cache_data = TibberPricesCacheData( + price_data={"priceInfo": {"today": [1, 2, 3]}}, + user_data={"viewer": {"home": {"id": "test"}}}, + last_price_update=cache_time, + last_user_update=cache_time, + last_midnight_check=None, + ) + + result = is_cache_valid(cache_data, "[TEST]", time=time_service) + + assert result is False diff --git a/tests/test_cache_validity_after_midnight.py b/tests/test_cache_validity_after_midnight.py new file mode 100644 index 0000000..fb6c72d --- /dev/null +++ b/tests/test_cache_validity_after_midnight.py @@ -0,0 +1,284 @@ +""" +Test cache validity status after midnight turnover. + +This test verifies that cache_validity correctly reports "valid" after midnight +turnover, even when _last_price_update is 5+ hours old (set to 00:00 during turnover). +The data is still valid because it was rotated (tomorrow→today), not stale. +""" + +from __future__ import annotations + +from datetime import datetime +from unittest.mock import Mock + +import pytest + +from custom_components.tibber_prices.coordinator.time_service import ( + TibberPricesTimeService, +) +from custom_components.tibber_prices.sensor.calculators.lifecycle import ( + TibberPricesLifecycleCalculator, +) + + +@pytest.mark.unit +def test_cache_validity_after_midnight_no_api_calls_within_2h() -> None: + """ + Test cache validity after midnight turnover - within 2 hour window. + + Scenario: + - Midnight turnover happened at 00:00 (set _last_price_update to 00:00) + - Current time: 01:30 (1.5 hours after turnover) + - Coordinator last ran at 01:15 (15 minutes ago) + - Cache age: 1.5 hours < 2 hours → Should be "valid" + + Expected: "valid" (not "stale") + Rationale: Data was rotated at midnight and is less than 2 hours old. + """ + # Create mock coordinator with midnight turnover state + mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) + + # Midnight turnover happened at 00:00 + midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime for simplicity + + # Current time: 01:30 (1.5 hours after turnover) + current_time = datetime(2025, 11, 22, 1, 30, 0) # noqa: DTZ001 - Test uses naive datetime + + # Coordinator last checked at 01:15 + coordinator_check_time = datetime(2025, 11, 22, 1, 15, 0) # noqa: DTZ001 - Test uses naive datetime + + # Mock TimeService + mock_time_service = Mock(spec=TibberPricesTimeService) + mock_time_service.now.return_value = current_time + mock_time_service.as_local.side_effect = lambda dt: dt # Assume UTC = local for simplicity + + # Configure coordinator state + mock_coordinator.data = {"priceInfo": {}} # Has data + mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state + mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state + mock_coordinator.time = mock_time_service + + # Create calculator + calculator = TibberPricesLifecycleCalculator(mock_coordinator) + + # Get cache validity status + status = calculator.get_cache_validity_status() + + # Should be "valid" - within 2-hour grace period after midnight + assert status == "valid" + + +@pytest.mark.unit +def test_cache_validity_after_midnight_no_api_calls_beyond_2h_coordinator_recent() -> None: + """ + Test cache validity after midnight turnover - beyond 2 hour window BUT coordinator ran recently. + + Scenario: + - Midnight turnover happened at 00:00 (set _last_price_update to 00:00) + - Current time: 05:57 (5 hours 57 minutes after turnover) + - Coordinator last ran at 05:45 (12 minutes ago) + - Cache age: ~6 hours > 2 hours, BUT coordinator checked recently → Should be "valid" + + Expected: "valid" (NOT "stale") + Rationale: Even though _last_price_update is old, coordinator validated cache recently. + """ + # Create mock coordinator with midnight turnover state + mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) + + # Midnight turnover happened at 00:00 + midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime + + # Current time: 05:57 (almost 6 hours after turnover) + current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime + + # Coordinator last checked at 05:45 (12 minutes ago) + coordinator_check_time = datetime(2025, 11, 22, 5, 45, 0) # noqa: DTZ001 - Test uses naive datetime + + # Mock TimeService + mock_time_service = Mock(spec=TibberPricesTimeService) + mock_time_service.now.return_value = current_time + mock_time_service.as_local.side_effect = lambda dt: dt # Assume UTC = local + + # Configure coordinator state + mock_coordinator.data = {"priceInfo": {}} # Has data + mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state + mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state + mock_coordinator.time = mock_time_service + + # Create calculator + calculator = TibberPricesLifecycleCalculator(mock_coordinator) + + # Get cache validity status + status = calculator.get_cache_validity_status() + + # Should be "valid" - coordinator validated cache recently + assert status == "valid" + + +@pytest.mark.unit +def test_cache_validity_after_midnight_beyond_2h_coordinator_old() -> None: + """ + Test cache validity when cache is old AND coordinator hasn't run recently. + + Scenario: + - Midnight turnover happened at 00:00 + - Current time: 05:57 + - Coordinator last ran at 05:00 (57 minutes ago > 30 min threshold) + - Cache age: ~6 hours > 2 hours AND coordinator check old → Should be "stale" + + Expected: "stale" + Rationale: Cache is old and coordinator hasn't validated it recently. + """ + # Create mock coordinator + mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) + + # Midnight turnover happened at 00:00 + midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime + + # Current time: 05:57 + current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime + + # Coordinator last checked at 05:00 (57 minutes ago - beyond 30 min threshold) + coordinator_check_time = datetime(2025, 11, 22, 5, 0, 0) # noqa: DTZ001 - Test uses naive datetime + + # Mock TimeService + mock_time_service = Mock(spec=TibberPricesTimeService) + mock_time_service.now.return_value = current_time + mock_time_service.as_local.side_effect = lambda dt: dt + + # Configure coordinator state + mock_coordinator.data = {"priceInfo": {}} # Has data + mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state + mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state + mock_coordinator.time = mock_time_service + + # Create calculator + calculator = TibberPricesLifecycleCalculator(mock_coordinator) + + # Get cache validity status + status = calculator.get_cache_validity_status() + + # Should be "stale" - cache old and coordinator check also old + assert status == "stale" + + +@pytest.mark.unit +def test_cache_validity_after_midnight_with_api_call() -> None: + """ + Test cache validity after midnight with API call made. + + Scenario: + - API call made at 00:15 (updated _last_price_update to 00:15) + - Current time: 05:57 (5h 42m after last API call) + - Age: ~5h 42m > 2 hours, BUT coordinator ran at 05:45 → Should be "valid" + + Expected: "valid" (NOT "stale") + Rationale: Coordinator validated cache recently (within 30 min). + """ + # Create mock coordinator + mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) + + # API call happened at 00:15 (15 minutes after midnight) + last_api_call = datetime(2025, 11, 22, 0, 15, 0) # noqa: DTZ001 - Test uses naive datetime + + # Current time: 05:57 + current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime + + # Coordinator last checked at 05:45 + coordinator_check_time = datetime(2025, 11, 22, 5, 45, 0) # noqa: DTZ001 - Test uses naive datetime + + # Mock TimeService + mock_time_service = Mock(spec=TibberPricesTimeService) + mock_time_service.now.return_value = current_time + mock_time_service.as_local.side_effect = lambda dt: dt + + # Configure coordinator state + mock_coordinator.data = {"priceInfo": {}} # Has data + mock_coordinator._last_price_update = last_api_call # noqa: SLF001 - Test accesses internal state + mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state + mock_coordinator.time = mock_time_service + + # Create calculator + calculator = TibberPricesLifecycleCalculator(mock_coordinator) + + # Get cache validity status + status = calculator.get_cache_validity_status() + + # Should be "valid" - coordinator validated recently + assert status == "valid" + + +@pytest.mark.unit +def test_cache_validity_date_mismatch() -> None: + """ + Test cache validity when cache is from yesterday. + + Scenario: + - Cache is from Nov 21 (yesterday) + - Current time: Nov 22, 05:57 (today) + - Should report "date_mismatch" + + Expected: "date_mismatch" + Rationale: Cache is from a different day, turnover didn't happen yet. + """ + # Create mock coordinator + mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) + + # Cache from yesterday + yesterday = datetime(2025, 11, 21, 22, 0, 0) # noqa: DTZ001 - Test uses naive datetime + + # Current time: today 05:57 + current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime + + # Mock TimeService + mock_time_service = Mock(spec=TibberPricesTimeService) + mock_time_service.now.return_value = current_time + mock_time_service.as_local.side_effect = lambda dt: dt + + # Configure coordinator state + mock_coordinator.data = {"priceInfo": {}} # Has data + mock_coordinator._last_price_update = yesterday # noqa: SLF001 - Test accesses internal state + mock_coordinator._last_coordinator_update = None # noqa: SLF001 - Test accesses internal state + mock_coordinator.time = mock_time_service + + # Create calculator + calculator = TibberPricesLifecycleCalculator(mock_coordinator) + + # Get cache validity status + status = calculator.get_cache_validity_status() + + # Should be "date_mismatch" - cache is from different day + assert status == "date_mismatch" + + +@pytest.mark.unit +def test_cache_validity_empty_no_data() -> None: + """ + Test cache validity when no data exists. + + Expected: "empty" + """ + mock_coordinator = Mock(spec=["data", "_last_price_update", "_api_calls_today", "time"]) + mock_coordinator.data = None # No data + + calculator = TibberPricesLifecycleCalculator(mock_coordinator) + status = calculator.get_cache_validity_status() + + assert status == "empty" + + +@pytest.mark.unit +def test_cache_validity_empty_no_timestamp() -> None: + """ + Test cache validity when data exists but no timestamp. + + Expected: "empty" + """ + mock_coordinator = Mock(spec=["data", "_last_price_update", "_api_calls_today", "time"]) + mock_coordinator.data = {"priceInfo": {}} # Has data + mock_coordinator._last_price_update = None # noqa: SLF001 - Test accesses internal state + + calculator = TibberPricesLifecycleCalculator(mock_coordinator) + status = calculator.get_cache_validity_status() + + assert status == "empty"