From 78df8a4b17500def55982932061975b368c5ac1c Mon Sep 17 00:00:00 2001 From: Julian Pawlowski <75446+jpawlowski@users.noreply.github.com> Date: Tue, 23 Dec 2025 14:13:34 +0000 Subject: [PATCH] refactor(lifecycle): integrate with Pool for sensor metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace cache-based metrics with Pool as single source of truth: - get_cache_age_minutes() → get_sensor_fetch_age_minutes() (from Pool) - Remove get_cache_validity_status(), get_data_completeness_status() - Add get_pool_stats() for comprehensive pool statistics - Add has_tomorrow_data() using Pool as source Attributes now show: - sensor_intervals_count/expected/has_gaps (protected range) - cache_intervals_total/limit/fill_percent/extra (entire pool) - last_sensor_fetch, cache_oldest/newest_interval timestamps - tomorrow_available based on Pool state Impact: More accurate lifecycle status, consistent with Pool as source of truth, cleaner diagnostic information. --- .../tibber_prices/diagnostics.py | 2 +- .../sensor/attributes/lifecycle.py | 87 +++--- .../sensor/calculators/lifecycle.py | 186 +++++------- tests/test_cache_age.py | 184 +++++++----- tests/test_cache_validity.py | 263 ---------------- tests/test_cache_validity_after_midnight.py | 284 ------------------ 6 files changed, 231 insertions(+), 775 deletions(-) delete mode 100644 tests/test_cache_validity.py delete mode 100644 tests/test_cache_validity_after_midnight.py diff --git a/custom_components/tibber_prices/diagnostics.py b/custom_components/tibber_prices/diagnostics.py index c5278b5..f7cbb89 100644 --- a/custom_components/tibber_prices/diagnostics.py +++ b/custom_components/tibber_prices/diagnostics.py @@ -70,7 +70,7 @@ async def async_get_config_entry_diagnostics( }, "cache_status": { "user_data_cached": coordinator._cached_user_data is not None, # noqa: SLF001 - "price_data_cached": coordinator._cached_price_data is not None, # noqa: SLF001 + "has_price_data": coordinator.data is not None and "priceInfo" in (coordinator.data or {}), "transformer_cache_valid": coordinator._data_transformer._cached_transformed_data is not None, # noqa: SLF001 "period_calculator_cache_valid": coordinator._period_calculator._cached_periods is not None, # noqa: SLF001 }, diff --git a/custom_components/tibber_prices/sensor/attributes/lifecycle.py b/custom_components/tibber_prices/sensor/attributes/lifecycle.py index a7f006a..36e4b48 100644 --- a/custom_components/tibber_prices/sensor/attributes/lifecycle.py +++ b/custom_components/tibber_prices/sensor/attributes/lifecycle.py @@ -13,7 +13,7 @@ if TYPE_CHECKING: ) -# Constants for cache age formatting +# Constants for fetch age formatting MINUTES_PER_HOUR = 60 MINUTES_PER_DAY = 1440 # 24 * 60 @@ -25,7 +25,8 @@ def build_lifecycle_attributes( """ Build attributes for data_lifecycle_status sensor. - Shows comprehensive cache status, data availability, and update timing. + Shows comprehensive pool status, data availability, and update timing. + Separates sensor-related stats from cache stats for clarity. Returns: Dict with lifecycle attributes @@ -33,41 +34,59 @@ def build_lifecycle_attributes( """ attributes: dict[str, Any] = {} - # Cache Status (formatted for readability) - cache_age = lifecycle_calculator.get_cache_age_minutes() - if cache_age is not None: - # Format cache age with units for better readability - if cache_age < MINUTES_PER_HOUR: - attributes["cache_age"] = f"{cache_age} min" - elif cache_age < MINUTES_PER_DAY: # Less than 24 hours - hours = cache_age // MINUTES_PER_HOUR - minutes = cache_age % MINUTES_PER_HOUR - attributes["cache_age"] = f"{hours}h {minutes}min" if minutes > 0 else f"{hours}h" + # === Pool Statistics (source of truth for cached data) === + pool_stats = lifecycle_calculator.get_pool_stats() + if pool_stats: + # --- Sensor Intervals (Protected Range: gestern bis übermorgen) --- + attributes["sensor_intervals_count"] = pool_stats.get("sensor_intervals_count", 0) + attributes["sensor_intervals_expected"] = pool_stats.get("sensor_intervals_expected", 384) + attributes["sensor_intervals_has_gaps"] = pool_stats.get("sensor_intervals_has_gaps", True) + + # --- Cache Statistics (Entire Pool) --- + attributes["cache_intervals_total"] = pool_stats.get("cache_intervals_total", 0) + attributes["cache_intervals_limit"] = pool_stats.get("cache_intervals_limit", 960) + attributes["cache_fill_percent"] = pool_stats.get("cache_fill_percent", 0) + attributes["cache_intervals_extra"] = pool_stats.get("cache_intervals_extra", 0) + + # --- Timestamps --- + last_sensor_fetch = pool_stats.get("last_sensor_fetch") + if last_sensor_fetch: + attributes["last_sensor_fetch"] = last_sensor_fetch + + oldest_interval = pool_stats.get("cache_oldest_interval") + if oldest_interval: + attributes["cache_oldest_interval"] = oldest_interval + + newest_interval = pool_stats.get("cache_newest_interval") + if newest_interval: + attributes["cache_newest_interval"] = newest_interval + + # --- API Fetch Groups (internal tracking) --- + attributes["fetch_groups_count"] = pool_stats.get("fetch_groups_count", 0) + + # === Sensor Fetch Age (human-readable) === + fetch_age = lifecycle_calculator.get_sensor_fetch_age_minutes() + if fetch_age is not None: + # Format fetch age with units for better readability + if fetch_age < MINUTES_PER_HOUR: + attributes["sensor_fetch_age"] = f"{fetch_age} min" + elif fetch_age < MINUTES_PER_DAY: # Less than 24 hours + hours = fetch_age // MINUTES_PER_HOUR + minutes = fetch_age % MINUTES_PER_HOUR + attributes["sensor_fetch_age"] = f"{hours}h {minutes}min" if minutes > 0 else f"{hours}h" else: # 24+ hours - days = cache_age // MINUTES_PER_DAY - hours = (cache_age % MINUTES_PER_DAY) // MINUTES_PER_HOUR - attributes["cache_age"] = f"{days}d {hours}h" if hours > 0 else f"{days}d" + days = fetch_age // MINUTES_PER_DAY + hours = (fetch_age % MINUTES_PER_DAY) // MINUTES_PER_HOUR + attributes["sensor_fetch_age"] = f"{days}d {hours}h" if hours > 0 else f"{days}d" # Keep raw value for automations - attributes["cache_age_minutes"] = cache_age + attributes["sensor_fetch_age_minutes"] = fetch_age - cache_validity = lifecycle_calculator.get_cache_validity_status() - attributes["cache_validity"] = cache_validity - - # Use single "last_update" field instead of duplicating as "last_api_fetch" and "last_cache_update" - if coordinator._last_price_update: # noqa: SLF001 - Internal state access for diagnostic display - attributes["last_update"] = coordinator._last_price_update.isoformat() # noqa: SLF001 - - # Data Availability & Completeness - data_completeness = lifecycle_calculator.get_data_completeness_status() - attributes["data_completeness"] = data_completeness - - attributes["yesterday_available"] = lifecycle_calculator.is_data_available(-1) - attributes["today_available"] = lifecycle_calculator.is_data_available(0) - attributes["tomorrow_available"] = lifecycle_calculator.is_data_available(1) + # === Tomorrow Data Status === + attributes["tomorrow_available"] = lifecycle_calculator.has_tomorrow_data() attributes["tomorrow_expected_after"] = "13:00" - # Next Actions (only show if meaningful) + # === Next Actions === next_poll = lifecycle_calculator.get_next_api_poll_time() if next_poll: # None means data is complete, no more polls needed attributes["next_api_poll"] = next_poll.isoformat() @@ -75,15 +94,15 @@ def build_lifecycle_attributes( next_midnight = lifecycle_calculator.get_next_midnight_turnover_time() attributes["next_midnight_turnover"] = next_midnight.isoformat() - # Update Statistics + # === Update Statistics === api_calls = lifecycle_calculator.get_api_calls_today() attributes["updates_today"] = api_calls - # Last Turnover Time (from midnight handler) + # === Midnight Turnover Info === if coordinator._midnight_handler.last_turnover_time: # noqa: SLF001 - Internal state access for diagnostic display attributes["last_turnover"] = coordinator._midnight_handler.last_turnover_time.isoformat() # noqa: SLF001 - # Last Error (if any) + # === Error Status === if coordinator.last_exception: attributes["last_error"] = str(coordinator.last_exception) diff --git a/custom_components/tibber_prices/sensor/calculators/lifecycle.py b/custom_components/tibber_prices/sensor/calculators/lifecycle.py index 0b0e1bb..072433b 100644 --- a/custom_components/tibber_prices/sensor/calculators/lifecycle.py +++ b/custom_components/tibber_prices/sensor/calculators/lifecycle.py @@ -2,11 +2,8 @@ from __future__ import annotations -from datetime import timedelta -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from datetime import datetime +from datetime import datetime, timedelta +from typing import Any from custom_components.tibber_prices.coordinator.constants import UPDATE_INTERVAL @@ -82,13 +79,26 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): # Priority 6: Default - using cached data return "cached" - def get_cache_age_minutes(self) -> int | None: - """Calculate how many minutes old the cached data is.""" - coordinator = self.coordinator - if not coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking + def get_sensor_fetch_age_minutes(self) -> int | None: + """ + Calculate how many minutes ago sensor data was last fetched. + + Uses the Pool's last_sensor_fetch as the source of truth. + This only counts API fetches for sensor data (protected range), + not service-triggered fetches for chart data. + + Returns: + Minutes since last sensor fetch, or None if no fetch recorded. + + """ + pool_stats = self._get_pool_stats() + if not pool_stats or not pool_stats.get("last_sensor_fetch"): return None - age = coordinator.time.now() - coordinator._last_price_update # noqa: SLF001 + last_fetch = pool_stats["last_sensor_fetch"] + # Parse ISO timestamp + last_fetch_dt = datetime.fromisoformat(last_fetch) + age = self.coordinator.time.now() - last_fetch_dt return int(age.total_seconds() / 60) def get_next_api_poll_time(self) -> datetime | None: @@ -188,108 +198,6 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): # Next midnight return now_local.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) - def is_data_available(self, day_offset: int) -> bool: - """ - Check if data is available for a specific day. - - Args: - day_offset: Day offset (-1=yesterday, 0=today, 1=tomorrow) - - Returns: - True if data exists and is not empty - - """ - if not self.has_data(): - return False - - day_data = self.get_intervals(day_offset) - return bool(day_data) - - def get_data_completeness_status(self) -> str: - """ - Get human-readable data completeness status. - - Returns: - 'complete': All data (yesterday/today/tomorrow) available - 'missing_tomorrow': Only yesterday and today available - 'missing_yesterday': Only today and tomorrow available - 'partial': Only today or some other partial combination - 'no_data': No data available at all - - """ - yesterday_available = self.is_data_available(-1) - today_available = self.is_data_available(0) - tomorrow_available = self.is_data_available(1) - - if yesterday_available and today_available and tomorrow_available: - return "complete" - if yesterday_available and today_available and not tomorrow_available: - return "missing_tomorrow" - if not yesterday_available and today_available and tomorrow_available: - return "missing_yesterday" - if today_available: - return "partial" - return "no_data" - - def get_cache_validity_status(self) -> str: - """ - Get cache validity status. - - Returns: - "valid": Cache is current and matches today's date - "stale": Cache exists but is outdated - "date_mismatch": Cache is from a different day - "empty": No cache data - - """ - coordinator = self.coordinator - # Check if coordinator has data (transformed, ready for entities) - if not self.has_data(): - return "empty" - - # Check if we have price update timestamp - if not coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking - return "empty" - - current_time = coordinator.time.now() - current_local_date = coordinator.time.as_local(current_time).date() - last_update_local_date = coordinator.time.as_local(coordinator._last_price_update).date() # noqa: SLF001 - - if current_local_date != last_update_local_date: - return "date_mismatch" - - # Check if cache is stale (older than expected) - # CRITICAL: After midnight turnover, _last_price_update is set to 00:00 - # without new API data. The data is still valid (rotated yesterday→today). - # - # Cache is considered "valid" if EITHER: - # 1. Within normal update interval expectations (age ≤ 2 hours), OR - # 2. Coordinator update cycle ran recently (within last 30 minutes) - # - # Why check _last_coordinator_update? - # - After midnight turnover, _last_price_update stays at 00:00 - # - But coordinator polls every 15 minutes and validates cache - # - If coordinator ran recently, cache was checked and deemed valid - # - This prevents false "stale" status when using rotated data - - age = current_time - coordinator._last_price_update # noqa: SLF001 - - # If cache age is within normal expectations (≤2 hours), it's valid - if age <= timedelta(hours=2): - return "valid" - - # Cache is older than 2 hours - check if coordinator validated it recently - # If coordinator ran within last 30 minutes, cache is considered current - # (even if _last_price_update is older, e.g., from midnight turnover) - if coordinator._last_coordinator_update: # noqa: SLF001 - Internal state access - time_since_coordinator_check = current_time - coordinator._last_coordinator_update # noqa: SLF001 - if time_since_coordinator_check <= timedelta(minutes=30): - # Coordinator validated cache recently - it's current - return "valid" - - # Cache is old AND coordinator hasn't validated recently - stale - return "stale" - def get_api_calls_today(self) -> int: """Get the number of API calls made today.""" coordinator = self.coordinator @@ -300,3 +208,57 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): return 0 return coordinator._api_calls_today # noqa: SLF001 + + def has_tomorrow_data(self) -> bool: + """ + Check if tomorrow's price data is available. + + Returns: + True if tomorrow data exists in the pool. + + """ + return not self.coordinator._needs_tomorrow_data() # noqa: SLF001 + + def get_pool_stats(self) -> dict[str, Any] | None: + """ + Get interval pool statistics. + + Returns: + Dict with pool stats or None if pool not available. + Contains: + - Sensor intervals (protected range): + - sensor_intervals_count: Intervals in protected range + - sensor_intervals_expected: Expected count (usually 384) + - sensor_intervals_has_gaps: True if gaps exist + - Cache statistics: + - cache_intervals_total: Total intervals in cache + - cache_intervals_limit: Maximum cache size + - cache_fill_percent: How full the cache is (%) + - cache_intervals_extra: Intervals outside protected range + - Timestamps: + - last_sensor_fetch: When sensor data was last fetched + - cache_oldest_interval: Oldest interval in cache + - cache_newest_interval: Newest interval in cache + - Metadata: + - fetch_groups_count: Number of API fetch batches stored + + """ + return self._get_pool_stats() + + def _get_pool_stats(self) -> dict[str, Any] | None: + """ + Get pool stats from coordinator. + + Returns: + Pool statistics dict or None. + + """ + coordinator = self.coordinator + # Access the pool via the price data manager + if hasattr(coordinator, "_price_data_manager"): + price_data_manager = coordinator._price_data_manager # noqa: SLF001 + if hasattr(price_data_manager, "_interval_pool"): + pool = price_data_manager._interval_pool # noqa: SLF001 + if pool is not None: + return pool.get_pool_stats() + return None diff --git a/tests/test_cache_age.py b/tests/test_cache_age.py index 956f73d..79be0cd 100644 --- a/tests/test_cache_age.py +++ b/tests/test_cache_age.py @@ -1,8 +1,8 @@ """ -Unit tests for cache age calculation. +Unit tests for sensor fetch age calculation. -Tests the get_cache_age_minutes() method which calculates how old -the cached data is in minutes. +Tests the get_sensor_fetch_age_minutes() method which calculates how old +the sensor data is in minutes (based on last API fetch for sensor intervals). """ from __future__ import annotations @@ -18,163 +18,185 @@ from custom_components.tibber_prices.sensor.calculators.lifecycle import ( ) -@pytest.mark.unit -def test_cache_age_no_update() -> None: - """ - Test cache age is None when no updates have occurred. - - Scenario: Integration just started, no data fetched yet - Expected: Cache age is None - """ +def _create_mock_coordinator_with_pool( + current_time: datetime, + last_sensor_fetch: datetime | None, +) -> Mock: + """Create a mock coordinator with pool stats configured.""" coordinator = Mock() coordinator.time = Mock() - - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) coordinator.time.now.return_value = current_time - coordinator._last_price_update = None # noqa: SLF001 - No update yet! + + # Mock the pool stats access path + mock_pool = Mock() + if last_sensor_fetch is not None: + mock_pool.get_pool_stats.return_value = { + # Sensor intervals (protected range) + "sensor_intervals_count": 384, + "sensor_intervals_expected": 384, + "sensor_intervals_has_gaps": False, + # Cache statistics + "cache_intervals_total": 384, + "cache_intervals_limit": 960, + "cache_fill_percent": 40.0, + "cache_intervals_extra": 0, + # Timestamps + "last_sensor_fetch": last_sensor_fetch.isoformat(), + "cache_oldest_interval": "2025-11-20T00:00:00", + "cache_newest_interval": "2025-11-23T23:45:00", + # Metadata + "fetch_groups_count": 1, + } + else: + mock_pool.get_pool_stats.return_value = { + # Sensor intervals (protected range) + "sensor_intervals_count": 0, + "sensor_intervals_expected": 384, + "sensor_intervals_has_gaps": True, + # Cache statistics + "cache_intervals_total": 0, + "cache_intervals_limit": 960, + "cache_fill_percent": 0, + "cache_intervals_extra": 0, + # Timestamps + "last_sensor_fetch": None, + "cache_oldest_interval": None, + "cache_newest_interval": None, + # Metadata + "fetch_groups_count": 0, + } + + mock_price_data_manager = Mock() + mock_price_data_manager._interval_pool = mock_pool # noqa: SLF001 + + coordinator._price_data_manager = mock_price_data_manager # noqa: SLF001 + + return coordinator + + +@pytest.mark.unit +def test_sensor_fetch_age_no_update() -> None: + """ + Test sensor fetch age is None when no updates have occurred. + + Scenario: Integration just started, no data fetched yet + Expected: Fetch age is None + """ + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator = _create_mock_coordinator_with_pool(current_time, None) calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_cache_age_minutes() + age = calculator.get_sensor_fetch_age_minutes() assert age is None @pytest.mark.unit -def test_cache_age_recent() -> None: +def test_sensor_fetch_age_recent() -> None: """ - Test cache age for recent data. + Test sensor fetch age for recent data. Scenario: Last update was 5 minutes ago - Expected: Cache age is 5 minutes + Expected: Fetch age is 5 minutes """ - coordinator = Mock() - coordinator.time = Mock() - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_update = current_time - timedelta(minutes=5) - - coordinator.time.now.return_value = current_time - coordinator._last_price_update = last_update # noqa: SLF001 + last_fetch = current_time - timedelta(minutes=5) + coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_cache_age_minutes() + age = calculator.get_sensor_fetch_age_minutes() assert age == 5 @pytest.mark.unit -def test_cache_age_old() -> None: +def test_sensor_fetch_age_old() -> None: """ - Test cache age for older data. + Test sensor fetch age for older data. Scenario: Last update was 90 minutes ago (6 update cycles missed) - Expected: Cache age is 90 minutes + Expected: Fetch age is 90 minutes """ - coordinator = Mock() - coordinator.time = Mock() - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_update = current_time - timedelta(minutes=90) - - coordinator.time.now.return_value = current_time - coordinator._last_price_update = last_update # noqa: SLF001 + last_fetch = current_time - timedelta(minutes=90) + coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_cache_age_minutes() + age = calculator.get_sensor_fetch_age_minutes() assert age == 90 @pytest.mark.unit -def test_cache_age_exact_minute() -> None: +def test_sensor_fetch_age_exact_minute() -> None: """ - Test cache age calculation rounds down to minutes. + Test sensor fetch age calculation rounds down to minutes. Scenario: Last update was 5 minutes and 45 seconds ago - Expected: Cache age is 5 minutes (int conversion truncates) + Expected: Fetch age is 5 minutes (int conversion truncates) """ - coordinator = Mock() - coordinator.time = Mock() - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_update = current_time - timedelta(minutes=5, seconds=45) - - coordinator.time.now.return_value = current_time - coordinator._last_price_update = last_update # noqa: SLF001 + last_fetch = current_time - timedelta(minutes=5, seconds=45) + coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_cache_age_minutes() + age = calculator.get_sensor_fetch_age_minutes() # int() truncates: 5.75 minutes → 5 assert age == 5 @pytest.mark.unit -def test_cache_age_zero_fresh_data() -> None: +def test_sensor_fetch_age_zero_fresh_data() -> None: """ - Test cache age is 0 for brand new data. + Test sensor fetch age is 0 for brand new data. Scenario: Last update was just now (< 60 seconds ago) - Expected: Cache age is 0 minutes + Expected: Fetch age is 0 minutes """ - coordinator = Mock() - coordinator.time = Mock() - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_update = current_time - timedelta(seconds=30) - - coordinator.time.now.return_value = current_time - coordinator._last_price_update = last_update # noqa: SLF001 + last_fetch = current_time - timedelta(seconds=30) + coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_cache_age_minutes() + age = calculator.get_sensor_fetch_age_minutes() assert age == 0 @pytest.mark.unit -def test_cache_age_multiple_hours() -> None: +def test_sensor_fetch_age_multiple_hours() -> None: """ - Test cache age for very old data (multiple hours). + Test sensor fetch age for very old data (multiple hours). Scenario: Last update was 3 hours ago (180 minutes) - Expected: Cache age is 180 minutes + Expected: Fetch age is 180 minutes This could happen if API was down or integration was stopped. """ - coordinator = Mock() - coordinator.time = Mock() - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_update = current_time - timedelta(hours=3) - - coordinator.time.now.return_value = current_time - coordinator._last_price_update = last_update # noqa: SLF001 + last_fetch = current_time - timedelta(hours=3) + coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_cache_age_minutes() + age = calculator.get_sensor_fetch_age_minutes() assert age == 180 @pytest.mark.unit -def test_cache_age_boundary_60_seconds() -> None: +def test_sensor_fetch_age_boundary_60_seconds() -> None: """ - Test cache age exactly at 60 seconds (1 minute boundary). + Test sensor fetch age exactly at 60 seconds (1 minute boundary). Scenario: Last update was exactly 60 seconds ago - Expected: Cache age is 1 minute + Expected: Fetch age is 1 minute """ - coordinator = Mock() - coordinator.time = Mock() - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_update = current_time - timedelta(seconds=60) - - coordinator.time.now.return_value = current_time - coordinator._last_price_update = last_update # noqa: SLF001 + last_fetch = current_time - timedelta(seconds=60) + coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_cache_age_minutes() + age = calculator.get_sensor_fetch_age_minutes() assert age == 1 diff --git a/tests/test_cache_validity.py b/tests/test_cache_validity.py deleted file mode 100644 index 0478be4..0000000 --- a/tests/test_cache_validity.py +++ /dev/null @@ -1,263 +0,0 @@ -""" -Unit tests for cache validity checks. - -Tests the is_cache_valid() function which determines if cached price data -is still current or needs to be refreshed. -""" - -from __future__ import annotations - -from datetime import datetime -from unittest.mock import Mock -from zoneinfo import ZoneInfo - -import pytest - -from custom_components.tibber_prices.coordinator.cache import ( - TibberPricesCacheData, - is_cache_valid, -) - - -@pytest.mark.unit -def test_cache_valid_same_day() -> None: - """ - Test cache is valid when data is from the same calendar day. - - Scenario: Cache from 10:00, current time 15:00 (same day) - Expected: Cache is valid - """ - time_service = Mock() - cache_time = datetime(2025, 11, 22, 10, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) - current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) - - time_service.now.return_value = current_time - time_service.as_local.side_effect = lambda dt: dt - - cache_data = TibberPricesCacheData( - price_data={"price_info": [1, 2, 3]}, - user_data={"viewer": {"home": {"id": "test"}}}, - last_price_update=cache_time, - last_user_update=cache_time, - last_midnight_check=None, - ) - - result = is_cache_valid(cache_data, "[TEST]", time=time_service) - - assert result is True - - -@pytest.mark.unit -def test_cache_invalid_different_day() -> None: - """ - Test cache is invalid when data is from a different calendar day. - - Scenario: Cache from yesterday, current time today - Expected: Cache is invalid (date mismatch) - """ - time_service = Mock() - cache_time = datetime(2025, 11, 21, 23, 50, 0, tzinfo=ZoneInfo("Europe/Oslo")) - current_time = datetime(2025, 11, 22, 0, 10, 0, tzinfo=ZoneInfo("Europe/Oslo")) - - time_service.now.return_value = current_time - time_service.as_local.side_effect = lambda dt: dt - - cache_data = TibberPricesCacheData( - price_data={"price_info": [1, 2, 3]}, - user_data={"viewer": {"home": {"id": "test"}}}, - last_price_update=cache_time, - last_user_update=cache_time, - last_midnight_check=None, - ) - - result = is_cache_valid(cache_data, "[TEST]", time=time_service) - - assert result is False - - -@pytest.mark.unit -def test_cache_invalid_no_price_data() -> None: - """ - Test cache is invalid when no price data exists. - - Scenario: Cache exists but price_data is None - Expected: Cache is invalid - """ - time_service = Mock() - current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) - - time_service.now.return_value = current_time - time_service.as_local.side_effect = lambda dt: dt - - cache_data = TibberPricesCacheData( - price_data=None, # No price data! - user_data={"viewer": {"home": {"id": "test"}}}, - last_price_update=current_time, - last_user_update=current_time, - last_midnight_check=None, - ) - - result = is_cache_valid(cache_data, "[TEST]", time=time_service) - - assert result is False - - -@pytest.mark.unit -def test_cache_invalid_no_last_update() -> None: - """ - Test cache is invalid when last_price_update is None. - - Scenario: Cache has data but no update timestamp - Expected: Cache is invalid - """ - time_service = Mock() - current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) - - time_service.now.return_value = current_time - time_service.as_local.side_effect = lambda dt: dt - - cache_data = TibberPricesCacheData( - price_data={"price_info": [1, 2, 3]}, - user_data={"viewer": {"home": {"id": "test"}}}, - last_price_update=None, # No timestamp! - last_user_update=None, - last_midnight_check=None, - ) - - result = is_cache_valid(cache_data, "[TEST]", time=time_service) - - assert result is False - - -@pytest.mark.unit -def test_cache_valid_after_midnight_turnover() -> None: - """ - Test cache validity after midnight turnover with updated timestamp. - - Scenario: Midnight turnover occurred, _last_price_update was updated to new day - Expected: Cache is valid (same date as current) - - This tests the fix for the "date_mismatch" bug where cache appeared invalid - after midnight despite successful data rotation. - """ - time_service = Mock() - # After midnight turnover, _last_price_update should be set to current time - turnover_time = datetime(2025, 11, 22, 0, 0, 5, tzinfo=ZoneInfo("Europe/Oslo")) - current_time = datetime(2025, 11, 22, 0, 10, 0, tzinfo=ZoneInfo("Europe/Oslo")) - - time_service.now.return_value = current_time - time_service.as_local.side_effect = lambda dt: dt - - cache_data = TibberPricesCacheData( - price_data={"price_info": [1, 2]}, - user_data={"viewer": {"home": {"id": "test"}}}, - last_price_update=turnover_time, # Updated during turnover! - last_user_update=turnover_time, - last_midnight_check=turnover_time, - ) - - result = is_cache_valid(cache_data, "[TEST]", time=time_service) - - assert result is True - - -@pytest.mark.unit -def test_cache_invalid_midnight_crossing_without_update() -> None: - """ - Test cache becomes invalid at midnight if timestamp not updated. - - Scenario: HA restarted after midnight, cache still has yesterday's timestamp - Expected: Cache is invalid (would be caught and refreshed) - """ - time_service = Mock() - cache_time = datetime(2025, 11, 21, 23, 55, 0, tzinfo=ZoneInfo("Europe/Oslo")) - current_time = datetime(2025, 11, 22, 0, 5, 0, tzinfo=ZoneInfo("Europe/Oslo")) - - time_service.now.return_value = current_time - time_service.as_local.side_effect = lambda dt: dt - - cache_data = TibberPricesCacheData( - price_data={"price_info": [1, 2, 3]}, - user_data={"viewer": {"home": {"id": "test"}}}, - last_price_update=cache_time, # Still yesterday! - last_user_update=cache_time, - last_midnight_check=None, - ) - - result = is_cache_valid(cache_data, "[TEST]", time=time_service) - - assert result is False - - -@pytest.mark.unit -def test_cache_validity_timezone_aware() -> None: - """ - Test cache validity uses local timezone for date comparison. - - Scenario: UTC midnight vs local timezone midnight (different dates) - Expected: Comparison done in local timezone, not UTC - - This ensures that midnight turnover happens at local midnight, - not UTC midnight. - """ - time_service = Mock() - - # 23:00 UTC on Nov 21 = 00:00 CET on Nov 22 (UTC+1) - cache_time_utc = datetime(2025, 11, 21, 23, 0, 0, tzinfo=ZoneInfo("UTC")) - current_time_utc = datetime(2025, 11, 21, 23, 30, 0, tzinfo=ZoneInfo("UTC")) - - # Convert to local timezone (CET = UTC+1) - cache_time_local = cache_time_utc.astimezone(ZoneInfo("Europe/Oslo")) # 00:00 Nov 22 - current_time_local = current_time_utc.astimezone(ZoneInfo("Europe/Oslo")) # 00:30 Nov 22 - - time_service.now.return_value = current_time_utc - time_service.as_local.return_value = current_time_local - - cache_data = TibberPricesCacheData( - price_data={"price_info": [1, 2, 3]}, - user_data={"viewer": {"home": {"id": "test"}}}, - last_price_update=cache_time_utc, - last_user_update=cache_time_utc, - last_midnight_check=None, - ) - - # Mock as_local for cache_time - def as_local_side_effect(dt: datetime) -> datetime: - if dt == cache_time_utc: - return cache_time_local - return current_time_local - - time_service.as_local.side_effect = as_local_side_effect - - result = is_cache_valid(cache_data, "[TEST]", time=time_service) - - # Both times are Nov 22 in local timezone → same date → valid - assert result is True - - -@pytest.mark.unit -def test_cache_validity_exact_midnight_boundary() -> None: - """ - Test cache validity exactly at midnight boundary. - - Scenario: Cache from 23:59:59, current time 00:00:00 - Expected: Cache is invalid (different calendar days) - """ - time_service = Mock() - cache_time = datetime(2025, 11, 21, 23, 59, 59, tzinfo=ZoneInfo("Europe/Oslo")) - current_time = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) - - time_service.now.return_value = current_time - time_service.as_local.side_effect = lambda dt: dt - - cache_data = TibberPricesCacheData( - price_data={"price_info": [1, 2, 3]}, - user_data={"viewer": {"home": {"id": "test"}}}, - last_price_update=cache_time, - last_user_update=cache_time, - last_midnight_check=None, - ) - - result = is_cache_valid(cache_data, "[TEST]", time=time_service) - - assert result is False diff --git a/tests/test_cache_validity_after_midnight.py b/tests/test_cache_validity_after_midnight.py deleted file mode 100644 index fb6c72d..0000000 --- a/tests/test_cache_validity_after_midnight.py +++ /dev/null @@ -1,284 +0,0 @@ -""" -Test cache validity status after midnight turnover. - -This test verifies that cache_validity correctly reports "valid" after midnight -turnover, even when _last_price_update is 5+ hours old (set to 00:00 during turnover). -The data is still valid because it was rotated (tomorrow→today), not stale. -""" - -from __future__ import annotations - -from datetime import datetime -from unittest.mock import Mock - -import pytest - -from custom_components.tibber_prices.coordinator.time_service import ( - TibberPricesTimeService, -) -from custom_components.tibber_prices.sensor.calculators.lifecycle import ( - TibberPricesLifecycleCalculator, -) - - -@pytest.mark.unit -def test_cache_validity_after_midnight_no_api_calls_within_2h() -> None: - """ - Test cache validity after midnight turnover - within 2 hour window. - - Scenario: - - Midnight turnover happened at 00:00 (set _last_price_update to 00:00) - - Current time: 01:30 (1.5 hours after turnover) - - Coordinator last ran at 01:15 (15 minutes ago) - - Cache age: 1.5 hours < 2 hours → Should be "valid" - - Expected: "valid" (not "stale") - Rationale: Data was rotated at midnight and is less than 2 hours old. - """ - # Create mock coordinator with midnight turnover state - mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) - - # Midnight turnover happened at 00:00 - midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime for simplicity - - # Current time: 01:30 (1.5 hours after turnover) - current_time = datetime(2025, 11, 22, 1, 30, 0) # noqa: DTZ001 - Test uses naive datetime - - # Coordinator last checked at 01:15 - coordinator_check_time = datetime(2025, 11, 22, 1, 15, 0) # noqa: DTZ001 - Test uses naive datetime - - # Mock TimeService - mock_time_service = Mock(spec=TibberPricesTimeService) - mock_time_service.now.return_value = current_time - mock_time_service.as_local.side_effect = lambda dt: dt # Assume UTC = local for simplicity - - # Configure coordinator state - mock_coordinator.data = {"priceInfo": {}} # Has data - mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state - mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state - mock_coordinator.time = mock_time_service - - # Create calculator - calculator = TibberPricesLifecycleCalculator(mock_coordinator) - - # Get cache validity status - status = calculator.get_cache_validity_status() - - # Should be "valid" - within 2-hour grace period after midnight - assert status == "valid" - - -@pytest.mark.unit -def test_cache_validity_after_midnight_no_api_calls_beyond_2h_coordinator_recent() -> None: - """ - Test cache validity after midnight turnover - beyond 2 hour window BUT coordinator ran recently. - - Scenario: - - Midnight turnover happened at 00:00 (set _last_price_update to 00:00) - - Current time: 05:57 (5 hours 57 minutes after turnover) - - Coordinator last ran at 05:45 (12 minutes ago) - - Cache age: ~6 hours > 2 hours, BUT coordinator checked recently → Should be "valid" - - Expected: "valid" (NOT "stale") - Rationale: Even though _last_price_update is old, coordinator validated cache recently. - """ - # Create mock coordinator with midnight turnover state - mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) - - # Midnight turnover happened at 00:00 - midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime - - # Current time: 05:57 (almost 6 hours after turnover) - current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime - - # Coordinator last checked at 05:45 (12 minutes ago) - coordinator_check_time = datetime(2025, 11, 22, 5, 45, 0) # noqa: DTZ001 - Test uses naive datetime - - # Mock TimeService - mock_time_service = Mock(spec=TibberPricesTimeService) - mock_time_service.now.return_value = current_time - mock_time_service.as_local.side_effect = lambda dt: dt # Assume UTC = local - - # Configure coordinator state - mock_coordinator.data = {"priceInfo": {}} # Has data - mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state - mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state - mock_coordinator.time = mock_time_service - - # Create calculator - calculator = TibberPricesLifecycleCalculator(mock_coordinator) - - # Get cache validity status - status = calculator.get_cache_validity_status() - - # Should be "valid" - coordinator validated cache recently - assert status == "valid" - - -@pytest.mark.unit -def test_cache_validity_after_midnight_beyond_2h_coordinator_old() -> None: - """ - Test cache validity when cache is old AND coordinator hasn't run recently. - - Scenario: - - Midnight turnover happened at 00:00 - - Current time: 05:57 - - Coordinator last ran at 05:00 (57 minutes ago > 30 min threshold) - - Cache age: ~6 hours > 2 hours AND coordinator check old → Should be "stale" - - Expected: "stale" - Rationale: Cache is old and coordinator hasn't validated it recently. - """ - # Create mock coordinator - mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) - - # Midnight turnover happened at 00:00 - midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime - - # Current time: 05:57 - current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime - - # Coordinator last checked at 05:00 (57 minutes ago - beyond 30 min threshold) - coordinator_check_time = datetime(2025, 11, 22, 5, 0, 0) # noqa: DTZ001 - Test uses naive datetime - - # Mock TimeService - mock_time_service = Mock(spec=TibberPricesTimeService) - mock_time_service.now.return_value = current_time - mock_time_service.as_local.side_effect = lambda dt: dt - - # Configure coordinator state - mock_coordinator.data = {"priceInfo": {}} # Has data - mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state - mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state - mock_coordinator.time = mock_time_service - - # Create calculator - calculator = TibberPricesLifecycleCalculator(mock_coordinator) - - # Get cache validity status - status = calculator.get_cache_validity_status() - - # Should be "stale" - cache old and coordinator check also old - assert status == "stale" - - -@pytest.mark.unit -def test_cache_validity_after_midnight_with_api_call() -> None: - """ - Test cache validity after midnight with API call made. - - Scenario: - - API call made at 00:15 (updated _last_price_update to 00:15) - - Current time: 05:57 (5h 42m after last API call) - - Age: ~5h 42m > 2 hours, BUT coordinator ran at 05:45 → Should be "valid" - - Expected: "valid" (NOT "stale") - Rationale: Coordinator validated cache recently (within 30 min). - """ - # Create mock coordinator - mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) - - # API call happened at 00:15 (15 minutes after midnight) - last_api_call = datetime(2025, 11, 22, 0, 15, 0) # noqa: DTZ001 - Test uses naive datetime - - # Current time: 05:57 - current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime - - # Coordinator last checked at 05:45 - coordinator_check_time = datetime(2025, 11, 22, 5, 45, 0) # noqa: DTZ001 - Test uses naive datetime - - # Mock TimeService - mock_time_service = Mock(spec=TibberPricesTimeService) - mock_time_service.now.return_value = current_time - mock_time_service.as_local.side_effect = lambda dt: dt - - # Configure coordinator state - mock_coordinator.data = {"priceInfo": {}} # Has data - mock_coordinator._last_price_update = last_api_call # noqa: SLF001 - Test accesses internal state - mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state - mock_coordinator.time = mock_time_service - - # Create calculator - calculator = TibberPricesLifecycleCalculator(mock_coordinator) - - # Get cache validity status - status = calculator.get_cache_validity_status() - - # Should be "valid" - coordinator validated recently - assert status == "valid" - - -@pytest.mark.unit -def test_cache_validity_date_mismatch() -> None: - """ - Test cache validity when cache is from yesterday. - - Scenario: - - Cache is from Nov 21 (yesterday) - - Current time: Nov 22, 05:57 (today) - - Should report "date_mismatch" - - Expected: "date_mismatch" - Rationale: Cache is from a different day, turnover didn't happen yet. - """ - # Create mock coordinator - mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"]) - - # Cache from yesterday - yesterday = datetime(2025, 11, 21, 22, 0, 0) # noqa: DTZ001 - Test uses naive datetime - - # Current time: today 05:57 - current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime - - # Mock TimeService - mock_time_service = Mock(spec=TibberPricesTimeService) - mock_time_service.now.return_value = current_time - mock_time_service.as_local.side_effect = lambda dt: dt - - # Configure coordinator state - mock_coordinator.data = {"priceInfo": {}} # Has data - mock_coordinator._last_price_update = yesterday # noqa: SLF001 - Test accesses internal state - mock_coordinator._last_coordinator_update = None # noqa: SLF001 - Test accesses internal state - mock_coordinator.time = mock_time_service - - # Create calculator - calculator = TibberPricesLifecycleCalculator(mock_coordinator) - - # Get cache validity status - status = calculator.get_cache_validity_status() - - # Should be "date_mismatch" - cache is from different day - assert status == "date_mismatch" - - -@pytest.mark.unit -def test_cache_validity_empty_no_data() -> None: - """ - Test cache validity when no data exists. - - Expected: "empty" - """ - mock_coordinator = Mock(spec=["data", "_last_price_update", "_api_calls_today", "time"]) - mock_coordinator.data = None # No data - - calculator = TibberPricesLifecycleCalculator(mock_coordinator) - status = calculator.get_cache_validity_status() - - assert status == "empty" - - -@pytest.mark.unit -def test_cache_validity_empty_no_timestamp() -> None: - """ - Test cache validity when data exists but no timestamp. - - Expected: "empty" - """ - mock_coordinator = Mock(spec=["data", "_last_price_update", "_api_calls_today", "time"]) - mock_coordinator.data = {"priceInfo": {}} # Has data - mock_coordinator._last_price_update = None # noqa: SLF001 - Test accesses internal state - - calculator = TibberPricesLifecycleCalculator(mock_coordinator) - status = calculator.get_cache_validity_status() - - assert status == "empty"