refactor(lifecycle): integrate with Pool for sensor metrics

Replace cache-based metrics with Pool as single source of truth:
- get_cache_age_minutes() → get_sensor_fetch_age_minutes() (from Pool)
- Remove get_cache_validity_status(), get_data_completeness_status()
- Add get_pool_stats() for comprehensive pool statistics
- Add has_tomorrow_data() using Pool as source

Attributes now show:
- sensor_intervals_count/expected/has_gaps (protected range)
- cache_intervals_total/limit/fill_percent/extra (entire pool)
- last_sensor_fetch, cache_oldest/newest_interval timestamps
- tomorrow_available based on Pool state

Impact: More accurate lifecycle status, consistent with Pool as source
of truth, cleaner diagnostic information.
This commit is contained in:
Julian Pawlowski 2025-12-23 14:13:34 +00:00
parent 7adc56bf79
commit 78df8a4b17
6 changed files with 231 additions and 775 deletions

View file

@ -70,7 +70,7 @@ async def async_get_config_entry_diagnostics(
},
"cache_status": {
"user_data_cached": coordinator._cached_user_data is not None, # noqa: SLF001
"price_data_cached": coordinator._cached_price_data is not None, # noqa: SLF001
"has_price_data": coordinator.data is not None and "priceInfo" in (coordinator.data or {}),
"transformer_cache_valid": coordinator._data_transformer._cached_transformed_data is not None, # noqa: SLF001
"period_calculator_cache_valid": coordinator._period_calculator._cached_periods is not None, # noqa: SLF001
},

View file

@ -13,7 +13,7 @@ if TYPE_CHECKING:
)
# Constants for cache age formatting
# Constants for fetch age formatting
MINUTES_PER_HOUR = 60
MINUTES_PER_DAY = 1440 # 24 * 60
@ -25,7 +25,8 @@ def build_lifecycle_attributes(
"""
Build attributes for data_lifecycle_status sensor.
Shows comprehensive cache status, data availability, and update timing.
Shows comprehensive pool status, data availability, and update timing.
Separates sensor-related stats from cache stats for clarity.
Returns:
Dict with lifecycle attributes
@ -33,41 +34,59 @@ def build_lifecycle_attributes(
"""
attributes: dict[str, Any] = {}
# Cache Status (formatted for readability)
cache_age = lifecycle_calculator.get_cache_age_minutes()
if cache_age is not None:
# Format cache age with units for better readability
if cache_age < MINUTES_PER_HOUR:
attributes["cache_age"] = f"{cache_age} min"
elif cache_age < MINUTES_PER_DAY: # Less than 24 hours
hours = cache_age // MINUTES_PER_HOUR
minutes = cache_age % MINUTES_PER_HOUR
attributes["cache_age"] = f"{hours}h {minutes}min" if minutes > 0 else f"{hours}h"
# === Pool Statistics (source of truth for cached data) ===
pool_stats = lifecycle_calculator.get_pool_stats()
if pool_stats:
# --- Sensor Intervals (Protected Range: gestern bis übermorgen) ---
attributes["sensor_intervals_count"] = pool_stats.get("sensor_intervals_count", 0)
attributes["sensor_intervals_expected"] = pool_stats.get("sensor_intervals_expected", 384)
attributes["sensor_intervals_has_gaps"] = pool_stats.get("sensor_intervals_has_gaps", True)
# --- Cache Statistics (Entire Pool) ---
attributes["cache_intervals_total"] = pool_stats.get("cache_intervals_total", 0)
attributes["cache_intervals_limit"] = pool_stats.get("cache_intervals_limit", 960)
attributes["cache_fill_percent"] = pool_stats.get("cache_fill_percent", 0)
attributes["cache_intervals_extra"] = pool_stats.get("cache_intervals_extra", 0)
# --- Timestamps ---
last_sensor_fetch = pool_stats.get("last_sensor_fetch")
if last_sensor_fetch:
attributes["last_sensor_fetch"] = last_sensor_fetch
oldest_interval = pool_stats.get("cache_oldest_interval")
if oldest_interval:
attributes["cache_oldest_interval"] = oldest_interval
newest_interval = pool_stats.get("cache_newest_interval")
if newest_interval:
attributes["cache_newest_interval"] = newest_interval
# --- API Fetch Groups (internal tracking) ---
attributes["fetch_groups_count"] = pool_stats.get("fetch_groups_count", 0)
# === Sensor Fetch Age (human-readable) ===
fetch_age = lifecycle_calculator.get_sensor_fetch_age_minutes()
if fetch_age is not None:
# Format fetch age with units for better readability
if fetch_age < MINUTES_PER_HOUR:
attributes["sensor_fetch_age"] = f"{fetch_age} min"
elif fetch_age < MINUTES_PER_DAY: # Less than 24 hours
hours = fetch_age // MINUTES_PER_HOUR
minutes = fetch_age % MINUTES_PER_HOUR
attributes["sensor_fetch_age"] = f"{hours}h {minutes}min" if minutes > 0 else f"{hours}h"
else: # 24+ hours
days = cache_age // MINUTES_PER_DAY
hours = (cache_age % MINUTES_PER_DAY) // MINUTES_PER_HOUR
attributes["cache_age"] = f"{days}d {hours}h" if hours > 0 else f"{days}d"
days = fetch_age // MINUTES_PER_DAY
hours = (fetch_age % MINUTES_PER_DAY) // MINUTES_PER_HOUR
attributes["sensor_fetch_age"] = f"{days}d {hours}h" if hours > 0 else f"{days}d"
# Keep raw value for automations
attributes["cache_age_minutes"] = cache_age
attributes["sensor_fetch_age_minutes"] = fetch_age
cache_validity = lifecycle_calculator.get_cache_validity_status()
attributes["cache_validity"] = cache_validity
# Use single "last_update" field instead of duplicating as "last_api_fetch" and "last_cache_update"
if coordinator._last_price_update: # noqa: SLF001 - Internal state access for diagnostic display
attributes["last_update"] = coordinator._last_price_update.isoformat() # noqa: SLF001
# Data Availability & Completeness
data_completeness = lifecycle_calculator.get_data_completeness_status()
attributes["data_completeness"] = data_completeness
attributes["yesterday_available"] = lifecycle_calculator.is_data_available(-1)
attributes["today_available"] = lifecycle_calculator.is_data_available(0)
attributes["tomorrow_available"] = lifecycle_calculator.is_data_available(1)
# === Tomorrow Data Status ===
attributes["tomorrow_available"] = lifecycle_calculator.has_tomorrow_data()
attributes["tomorrow_expected_after"] = "13:00"
# Next Actions (only show if meaningful)
# === Next Actions ===
next_poll = lifecycle_calculator.get_next_api_poll_time()
if next_poll: # None means data is complete, no more polls needed
attributes["next_api_poll"] = next_poll.isoformat()
@ -75,15 +94,15 @@ def build_lifecycle_attributes(
next_midnight = lifecycle_calculator.get_next_midnight_turnover_time()
attributes["next_midnight_turnover"] = next_midnight.isoformat()
# Update Statistics
# === Update Statistics ===
api_calls = lifecycle_calculator.get_api_calls_today()
attributes["updates_today"] = api_calls
# Last Turnover Time (from midnight handler)
# === Midnight Turnover Info ===
if coordinator._midnight_handler.last_turnover_time: # noqa: SLF001 - Internal state access for diagnostic display
attributes["last_turnover"] = coordinator._midnight_handler.last_turnover_time.isoformat() # noqa: SLF001
# Last Error (if any)
# === Error Status ===
if coordinator.last_exception:
attributes["last_error"] = str(coordinator.last_exception)

View file

@ -2,11 +2,8 @@
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from datetime import datetime
from datetime import datetime, timedelta
from typing import Any
from custom_components.tibber_prices.coordinator.constants import UPDATE_INTERVAL
@ -82,13 +79,26 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
# Priority 6: Default - using cached data
return "cached"
def get_cache_age_minutes(self) -> int | None:
"""Calculate how many minutes old the cached data is."""
coordinator = self.coordinator
if not coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking
def get_sensor_fetch_age_minutes(self) -> int | None:
"""
Calculate how many minutes ago sensor data was last fetched.
Uses the Pool's last_sensor_fetch as the source of truth.
This only counts API fetches for sensor data (protected range),
not service-triggered fetches for chart data.
Returns:
Minutes since last sensor fetch, or None if no fetch recorded.
"""
pool_stats = self._get_pool_stats()
if not pool_stats or not pool_stats.get("last_sensor_fetch"):
return None
age = coordinator.time.now() - coordinator._last_price_update # noqa: SLF001
last_fetch = pool_stats["last_sensor_fetch"]
# Parse ISO timestamp
last_fetch_dt = datetime.fromisoformat(last_fetch)
age = self.coordinator.time.now() - last_fetch_dt
return int(age.total_seconds() / 60)
def get_next_api_poll_time(self) -> datetime | None:
@ -188,108 +198,6 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
# Next midnight
return now_local.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
def is_data_available(self, day_offset: int) -> bool:
"""
Check if data is available for a specific day.
Args:
day_offset: Day offset (-1=yesterday, 0=today, 1=tomorrow)
Returns:
True if data exists and is not empty
"""
if not self.has_data():
return False
day_data = self.get_intervals(day_offset)
return bool(day_data)
def get_data_completeness_status(self) -> str:
"""
Get human-readable data completeness status.
Returns:
'complete': All data (yesterday/today/tomorrow) available
'missing_tomorrow': Only yesterday and today available
'missing_yesterday': Only today and tomorrow available
'partial': Only today or some other partial combination
'no_data': No data available at all
"""
yesterday_available = self.is_data_available(-1)
today_available = self.is_data_available(0)
tomorrow_available = self.is_data_available(1)
if yesterday_available and today_available and tomorrow_available:
return "complete"
if yesterday_available and today_available and not tomorrow_available:
return "missing_tomorrow"
if not yesterday_available and today_available and tomorrow_available:
return "missing_yesterday"
if today_available:
return "partial"
return "no_data"
def get_cache_validity_status(self) -> str:
"""
Get cache validity status.
Returns:
"valid": Cache is current and matches today's date
"stale": Cache exists but is outdated
"date_mismatch": Cache is from a different day
"empty": No cache data
"""
coordinator = self.coordinator
# Check if coordinator has data (transformed, ready for entities)
if not self.has_data():
return "empty"
# Check if we have price update timestamp
if not coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking
return "empty"
current_time = coordinator.time.now()
current_local_date = coordinator.time.as_local(current_time).date()
last_update_local_date = coordinator.time.as_local(coordinator._last_price_update).date() # noqa: SLF001
if current_local_date != last_update_local_date:
return "date_mismatch"
# Check if cache is stale (older than expected)
# CRITICAL: After midnight turnover, _last_price_update is set to 00:00
# without new API data. The data is still valid (rotated yesterday→today).
#
# Cache is considered "valid" if EITHER:
# 1. Within normal update interval expectations (age ≤ 2 hours), OR
# 2. Coordinator update cycle ran recently (within last 30 minutes)
#
# Why check _last_coordinator_update?
# - After midnight turnover, _last_price_update stays at 00:00
# - But coordinator polls every 15 minutes and validates cache
# - If coordinator ran recently, cache was checked and deemed valid
# - This prevents false "stale" status when using rotated data
age = current_time - coordinator._last_price_update # noqa: SLF001
# If cache age is within normal expectations (≤2 hours), it's valid
if age <= timedelta(hours=2):
return "valid"
# Cache is older than 2 hours - check if coordinator validated it recently
# If coordinator ran within last 30 minutes, cache is considered current
# (even if _last_price_update is older, e.g., from midnight turnover)
if coordinator._last_coordinator_update: # noqa: SLF001 - Internal state access
time_since_coordinator_check = current_time - coordinator._last_coordinator_update # noqa: SLF001
if time_since_coordinator_check <= timedelta(minutes=30):
# Coordinator validated cache recently - it's current
return "valid"
# Cache is old AND coordinator hasn't validated recently - stale
return "stale"
def get_api_calls_today(self) -> int:
"""Get the number of API calls made today."""
coordinator = self.coordinator
@ -300,3 +208,57 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
return 0
return coordinator._api_calls_today # noqa: SLF001
def has_tomorrow_data(self) -> bool:
"""
Check if tomorrow's price data is available.
Returns:
True if tomorrow data exists in the pool.
"""
return not self.coordinator._needs_tomorrow_data() # noqa: SLF001
def get_pool_stats(self) -> dict[str, Any] | None:
"""
Get interval pool statistics.
Returns:
Dict with pool stats or None if pool not available.
Contains:
- Sensor intervals (protected range):
- sensor_intervals_count: Intervals in protected range
- sensor_intervals_expected: Expected count (usually 384)
- sensor_intervals_has_gaps: True if gaps exist
- Cache statistics:
- cache_intervals_total: Total intervals in cache
- cache_intervals_limit: Maximum cache size
- cache_fill_percent: How full the cache is (%)
- cache_intervals_extra: Intervals outside protected range
- Timestamps:
- last_sensor_fetch: When sensor data was last fetched
- cache_oldest_interval: Oldest interval in cache
- cache_newest_interval: Newest interval in cache
- Metadata:
- fetch_groups_count: Number of API fetch batches stored
"""
return self._get_pool_stats()
def _get_pool_stats(self) -> dict[str, Any] | None:
"""
Get pool stats from coordinator.
Returns:
Pool statistics dict or None.
"""
coordinator = self.coordinator
# Access the pool via the price data manager
if hasattr(coordinator, "_price_data_manager"):
price_data_manager = coordinator._price_data_manager # noqa: SLF001
if hasattr(price_data_manager, "_interval_pool"):
pool = price_data_manager._interval_pool # noqa: SLF001
if pool is not None:
return pool.get_pool_stats()
return None

View file

@ -1,8 +1,8 @@
"""
Unit tests for cache age calculation.
Unit tests for sensor fetch age calculation.
Tests the get_cache_age_minutes() method which calculates how old
the cached data is in minutes.
Tests the get_sensor_fetch_age_minutes() method which calculates how old
the sensor data is in minutes (based on last API fetch for sensor intervals).
"""
from __future__ import annotations
@ -18,163 +18,185 @@ from custom_components.tibber_prices.sensor.calculators.lifecycle import (
)
@pytest.mark.unit
def test_cache_age_no_update() -> None:
"""
Test cache age is None when no updates have occurred.
Scenario: Integration just started, no data fetched yet
Expected: Cache age is None
"""
def _create_mock_coordinator_with_pool(
current_time: datetime,
last_sensor_fetch: datetime | None,
) -> Mock:
"""Create a mock coordinator with pool stats configured."""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator._last_price_update = None # noqa: SLF001 - No update yet!
# Mock the pool stats access path
mock_pool = Mock()
if last_sensor_fetch is not None:
mock_pool.get_pool_stats.return_value = {
# Sensor intervals (protected range)
"sensor_intervals_count": 384,
"sensor_intervals_expected": 384,
"sensor_intervals_has_gaps": False,
# Cache statistics
"cache_intervals_total": 384,
"cache_intervals_limit": 960,
"cache_fill_percent": 40.0,
"cache_intervals_extra": 0,
# Timestamps
"last_sensor_fetch": last_sensor_fetch.isoformat(),
"cache_oldest_interval": "2025-11-20T00:00:00",
"cache_newest_interval": "2025-11-23T23:45:00",
# Metadata
"fetch_groups_count": 1,
}
else:
mock_pool.get_pool_stats.return_value = {
# Sensor intervals (protected range)
"sensor_intervals_count": 0,
"sensor_intervals_expected": 384,
"sensor_intervals_has_gaps": True,
# Cache statistics
"cache_intervals_total": 0,
"cache_intervals_limit": 960,
"cache_fill_percent": 0,
"cache_intervals_extra": 0,
# Timestamps
"last_sensor_fetch": None,
"cache_oldest_interval": None,
"cache_newest_interval": None,
# Metadata
"fetch_groups_count": 0,
}
mock_price_data_manager = Mock()
mock_price_data_manager._interval_pool = mock_pool # noqa: SLF001
coordinator._price_data_manager = mock_price_data_manager # noqa: SLF001
return coordinator
@pytest.mark.unit
def test_sensor_fetch_age_no_update() -> None:
"""
Test sensor fetch age is None when no updates have occurred.
Scenario: Integration just started, no data fetched yet
Expected: Fetch age is None
"""
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator = _create_mock_coordinator_with_pool(current_time, None)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
age = calculator.get_sensor_fetch_age_minutes()
assert age is None
@pytest.mark.unit
def test_cache_age_recent() -> None:
def test_sensor_fetch_age_recent() -> None:
"""
Test cache age for recent data.
Test sensor fetch age for recent data.
Scenario: Last update was 5 minutes ago
Expected: Cache age is 5 minutes
Expected: Fetch age is 5 minutes
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(minutes=5)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
last_fetch = current_time - timedelta(minutes=5)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
age = calculator.get_sensor_fetch_age_minutes()
assert age == 5
@pytest.mark.unit
def test_cache_age_old() -> None:
def test_sensor_fetch_age_old() -> None:
"""
Test cache age for older data.
Test sensor fetch age for older data.
Scenario: Last update was 90 minutes ago (6 update cycles missed)
Expected: Cache age is 90 minutes
Expected: Fetch age is 90 minutes
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(minutes=90)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
last_fetch = current_time - timedelta(minutes=90)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
age = calculator.get_sensor_fetch_age_minutes()
assert age == 90
@pytest.mark.unit
def test_cache_age_exact_minute() -> None:
def test_sensor_fetch_age_exact_minute() -> None:
"""
Test cache age calculation rounds down to minutes.
Test sensor fetch age calculation rounds down to minutes.
Scenario: Last update was 5 minutes and 45 seconds ago
Expected: Cache age is 5 minutes (int conversion truncates)
Expected: Fetch age is 5 minutes (int conversion truncates)
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(minutes=5, seconds=45)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
last_fetch = current_time - timedelta(minutes=5, seconds=45)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
age = calculator.get_sensor_fetch_age_minutes()
# int() truncates: 5.75 minutes → 5
assert age == 5
@pytest.mark.unit
def test_cache_age_zero_fresh_data() -> None:
def test_sensor_fetch_age_zero_fresh_data() -> None:
"""
Test cache age is 0 for brand new data.
Test sensor fetch age is 0 for brand new data.
Scenario: Last update was just now (< 60 seconds ago)
Expected: Cache age is 0 minutes
Expected: Fetch age is 0 minutes
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(seconds=30)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
last_fetch = current_time - timedelta(seconds=30)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
age = calculator.get_sensor_fetch_age_minutes()
assert age == 0
@pytest.mark.unit
def test_cache_age_multiple_hours() -> None:
def test_sensor_fetch_age_multiple_hours() -> None:
"""
Test cache age for very old data (multiple hours).
Test sensor fetch age for very old data (multiple hours).
Scenario: Last update was 3 hours ago (180 minutes)
Expected: Cache age is 180 minutes
Expected: Fetch age is 180 minutes
This could happen if API was down or integration was stopped.
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(hours=3)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
last_fetch = current_time - timedelta(hours=3)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
age = calculator.get_sensor_fetch_age_minutes()
assert age == 180
@pytest.mark.unit
def test_cache_age_boundary_60_seconds() -> None:
def test_sensor_fetch_age_boundary_60_seconds() -> None:
"""
Test cache age exactly at 60 seconds (1 minute boundary).
Test sensor fetch age exactly at 60 seconds (1 minute boundary).
Scenario: Last update was exactly 60 seconds ago
Expected: Cache age is 1 minute
Expected: Fetch age is 1 minute
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = current_time - timedelta(seconds=60)
coordinator.time.now.return_value = current_time
coordinator._last_price_update = last_update # noqa: SLF001
last_fetch = current_time - timedelta(seconds=60)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_cache_age_minutes()
age = calculator.get_sensor_fetch_age_minutes()
assert age == 1

View file

@ -1,263 +0,0 @@
"""
Unit tests for cache validity checks.
Tests the is_cache_valid() function which determines if cached price data
is still current or needs to be refreshed.
"""
from __future__ import annotations
from datetime import datetime
from unittest.mock import Mock
from zoneinfo import ZoneInfo
import pytest
from custom_components.tibber_prices.coordinator.cache import (
TibberPricesCacheData,
is_cache_valid,
)
@pytest.mark.unit
def test_cache_valid_same_day() -> None:
"""
Test cache is valid when data is from the same calendar day.
Scenario: Cache from 10:00, current time 15:00 (same day)
Expected: Cache is valid
"""
time_service = Mock()
cache_time = datetime(2025, 11, 22, 10, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"price_info": [1, 2, 3]},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time,
last_user_update=cache_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is True
@pytest.mark.unit
def test_cache_invalid_different_day() -> None:
"""
Test cache is invalid when data is from a different calendar day.
Scenario: Cache from yesterday, current time today
Expected: Cache is invalid (date mismatch)
"""
time_service = Mock()
cache_time = datetime(2025, 11, 21, 23, 50, 0, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 0, 10, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"price_info": [1, 2, 3]},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time,
last_user_update=cache_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False
@pytest.mark.unit
def test_cache_invalid_no_price_data() -> None:
"""
Test cache is invalid when no price data exists.
Scenario: Cache exists but price_data is None
Expected: Cache is invalid
"""
time_service = Mock()
current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data=None, # No price data!
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=current_time,
last_user_update=current_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False
@pytest.mark.unit
def test_cache_invalid_no_last_update() -> None:
"""
Test cache is invalid when last_price_update is None.
Scenario: Cache has data but no update timestamp
Expected: Cache is invalid
"""
time_service = Mock()
current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"price_info": [1, 2, 3]},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=None, # No timestamp!
last_user_update=None,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False
@pytest.mark.unit
def test_cache_valid_after_midnight_turnover() -> None:
"""
Test cache validity after midnight turnover with updated timestamp.
Scenario: Midnight turnover occurred, _last_price_update was updated to new day
Expected: Cache is valid (same date as current)
This tests the fix for the "date_mismatch" bug where cache appeared invalid
after midnight despite successful data rotation.
"""
time_service = Mock()
# After midnight turnover, _last_price_update should be set to current time
turnover_time = datetime(2025, 11, 22, 0, 0, 5, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 0, 10, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"price_info": [1, 2]},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=turnover_time, # Updated during turnover!
last_user_update=turnover_time,
last_midnight_check=turnover_time,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is True
@pytest.mark.unit
def test_cache_invalid_midnight_crossing_without_update() -> None:
"""
Test cache becomes invalid at midnight if timestamp not updated.
Scenario: HA restarted after midnight, cache still has yesterday's timestamp
Expected: Cache is invalid (would be caught and refreshed)
"""
time_service = Mock()
cache_time = datetime(2025, 11, 21, 23, 55, 0, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 0, 5, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"price_info": [1, 2, 3]},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time, # Still yesterday!
last_user_update=cache_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False
@pytest.mark.unit
def test_cache_validity_timezone_aware() -> None:
"""
Test cache validity uses local timezone for date comparison.
Scenario: UTC midnight vs local timezone midnight (different dates)
Expected: Comparison done in local timezone, not UTC
This ensures that midnight turnover happens at local midnight,
not UTC midnight.
"""
time_service = Mock()
# 23:00 UTC on Nov 21 = 00:00 CET on Nov 22 (UTC+1)
cache_time_utc = datetime(2025, 11, 21, 23, 0, 0, tzinfo=ZoneInfo("UTC"))
current_time_utc = datetime(2025, 11, 21, 23, 30, 0, tzinfo=ZoneInfo("UTC"))
# Convert to local timezone (CET = UTC+1)
cache_time_local = cache_time_utc.astimezone(ZoneInfo("Europe/Oslo")) # 00:00 Nov 22
current_time_local = current_time_utc.astimezone(ZoneInfo("Europe/Oslo")) # 00:30 Nov 22
time_service.now.return_value = current_time_utc
time_service.as_local.return_value = current_time_local
cache_data = TibberPricesCacheData(
price_data={"price_info": [1, 2, 3]},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time_utc,
last_user_update=cache_time_utc,
last_midnight_check=None,
)
# Mock as_local for cache_time
def as_local_side_effect(dt: datetime) -> datetime:
if dt == cache_time_utc:
return cache_time_local
return current_time_local
time_service.as_local.side_effect = as_local_side_effect
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
# Both times are Nov 22 in local timezone → same date → valid
assert result is True
@pytest.mark.unit
def test_cache_validity_exact_midnight_boundary() -> None:
"""
Test cache validity exactly at midnight boundary.
Scenario: Cache from 23:59:59, current time 00:00:00
Expected: Cache is invalid (different calendar days)
"""
time_service = Mock()
cache_time = datetime(2025, 11, 21, 23, 59, 59, tzinfo=ZoneInfo("Europe/Oslo"))
current_time = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
time_service.now.return_value = current_time
time_service.as_local.side_effect = lambda dt: dt
cache_data = TibberPricesCacheData(
price_data={"price_info": [1, 2, 3]},
user_data={"viewer": {"home": {"id": "test"}}},
last_price_update=cache_time,
last_user_update=cache_time,
last_midnight_check=None,
)
result = is_cache_valid(cache_data, "[TEST]", time=time_service)
assert result is False

View file

@ -1,284 +0,0 @@
"""
Test cache validity status after midnight turnover.
This test verifies that cache_validity correctly reports "valid" after midnight
turnover, even when _last_price_update is 5+ hours old (set to 00:00 during turnover).
The data is still valid because it was rotated (tomorrowtoday), not stale.
"""
from __future__ import annotations
from datetime import datetime
from unittest.mock import Mock
import pytest
from custom_components.tibber_prices.coordinator.time_service import (
TibberPricesTimeService,
)
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
TibberPricesLifecycleCalculator,
)
@pytest.mark.unit
def test_cache_validity_after_midnight_no_api_calls_within_2h() -> None:
"""
Test cache validity after midnight turnover - within 2 hour window.
Scenario:
- Midnight turnover happened at 00:00 (set _last_price_update to 00:00)
- Current time: 01:30 (1.5 hours after turnover)
- Coordinator last ran at 01:15 (15 minutes ago)
- Cache age: 1.5 hours < 2 hours Should be "valid"
Expected: "valid" (not "stale")
Rationale: Data was rotated at midnight and is less than 2 hours old.
"""
# Create mock coordinator with midnight turnover state
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# Midnight turnover happened at 00:00
midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime for simplicity
# Current time: 01:30 (1.5 hours after turnover)
current_time = datetime(2025, 11, 22, 1, 30, 0) # noqa: DTZ001 - Test uses naive datetime
# Coordinator last checked at 01:15
coordinator_check_time = datetime(2025, 11, 22, 1, 15, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt # Assume UTC = local for simplicity
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "valid" - within 2-hour grace period after midnight
assert status == "valid"
@pytest.mark.unit
def test_cache_validity_after_midnight_no_api_calls_beyond_2h_coordinator_recent() -> None:
"""
Test cache validity after midnight turnover - beyond 2 hour window BUT coordinator ran recently.
Scenario:
- Midnight turnover happened at 00:00 (set _last_price_update to 00:00)
- Current time: 05:57 (5 hours 57 minutes after turnover)
- Coordinator last ran at 05:45 (12 minutes ago)
- Cache age: ~6 hours > 2 hours, BUT coordinator checked recently Should be "valid"
Expected: "valid" (NOT "stale")
Rationale: Even though _last_price_update is old, coordinator validated cache recently.
"""
# Create mock coordinator with midnight turnover state
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# Midnight turnover happened at 00:00
midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime
# Current time: 05:57 (almost 6 hours after turnover)
current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime
# Coordinator last checked at 05:45 (12 minutes ago)
coordinator_check_time = datetime(2025, 11, 22, 5, 45, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt # Assume UTC = local
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "valid" - coordinator validated cache recently
assert status == "valid"
@pytest.mark.unit
def test_cache_validity_after_midnight_beyond_2h_coordinator_old() -> None:
"""
Test cache validity when cache is old AND coordinator hasn't run recently.
Scenario:
- Midnight turnover happened at 00:00
- Current time: 05:57
- Coordinator last ran at 05:00 (57 minutes ago > 30 min threshold)
- Cache age: ~6 hours > 2 hours AND coordinator check old Should be "stale"
Expected: "stale"
Rationale: Cache is old and coordinator hasn't validated it recently.
"""
# Create mock coordinator
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# Midnight turnover happened at 00:00
midnight = datetime(2025, 11, 22, 0, 0, 0) # noqa: DTZ001 - Test uses naive datetime
# Current time: 05:57
current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime
# Coordinator last checked at 05:00 (57 minutes ago - beyond 30 min threshold)
coordinator_check_time = datetime(2025, 11, 22, 5, 0, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = midnight # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "stale" - cache old and coordinator check also old
assert status == "stale"
@pytest.mark.unit
def test_cache_validity_after_midnight_with_api_call() -> None:
"""
Test cache validity after midnight with API call made.
Scenario:
- API call made at 00:15 (updated _last_price_update to 00:15)
- Current time: 05:57 (5h 42m after last API call)
- Age: ~5h 42m > 2 hours, BUT coordinator ran at 05:45 Should be "valid"
Expected: "valid" (NOT "stale")
Rationale: Coordinator validated cache recently (within 30 min).
"""
# Create mock coordinator
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# API call happened at 00:15 (15 minutes after midnight)
last_api_call = datetime(2025, 11, 22, 0, 15, 0) # noqa: DTZ001 - Test uses naive datetime
# Current time: 05:57
current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime
# Coordinator last checked at 05:45
coordinator_check_time = datetime(2025, 11, 22, 5, 45, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = last_api_call # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = coordinator_check_time # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "valid" - coordinator validated recently
assert status == "valid"
@pytest.mark.unit
def test_cache_validity_date_mismatch() -> None:
"""
Test cache validity when cache is from yesterday.
Scenario:
- Cache is from Nov 21 (yesterday)
- Current time: Nov 22, 05:57 (today)
- Should report "date_mismatch"
Expected: "date_mismatch"
Rationale: Cache is from a different day, turnover didn't happen yet.
"""
# Create mock coordinator
mock_coordinator = Mock(spec=["data", "_last_price_update", "_last_coordinator_update", "time"])
# Cache from yesterday
yesterday = datetime(2025, 11, 21, 22, 0, 0) # noqa: DTZ001 - Test uses naive datetime
# Current time: today 05:57
current_time = datetime(2025, 11, 22, 5, 57, 0) # noqa: DTZ001 - Test uses naive datetime
# Mock TimeService
mock_time_service = Mock(spec=TibberPricesTimeService)
mock_time_service.now.return_value = current_time
mock_time_service.as_local.side_effect = lambda dt: dt
# Configure coordinator state
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = yesterday # noqa: SLF001 - Test accesses internal state
mock_coordinator._last_coordinator_update = None # noqa: SLF001 - Test accesses internal state
mock_coordinator.time = mock_time_service
# Create calculator
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
# Get cache validity status
status = calculator.get_cache_validity_status()
# Should be "date_mismatch" - cache is from different day
assert status == "date_mismatch"
@pytest.mark.unit
def test_cache_validity_empty_no_data() -> None:
"""
Test cache validity when no data exists.
Expected: "empty"
"""
mock_coordinator = Mock(spec=["data", "_last_price_update", "_api_calls_today", "time"])
mock_coordinator.data = None # No data
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
status = calculator.get_cache_validity_status()
assert status == "empty"
@pytest.mark.unit
def test_cache_validity_empty_no_timestamp() -> None:
"""
Test cache validity when data exists but no timestamp.
Expected: "empty"
"""
mock_coordinator = Mock(spec=["data", "_last_price_update", "_api_calls_today", "time"])
mock_coordinator.data = {"priceInfo": {}} # Has data
mock_coordinator._last_price_update = None # noqa: SLF001 - Test accesses internal state
calculator = TibberPricesLifecycleCalculator(mock_coordinator)
status = calculator.get_cache_validity_status()
assert status == "empty"