diff --git a/custom_components/tibber_prices/coordinator/constants.py b/custom_components/tibber_prices/coordinator/constants.py index b528b66..d633486 100644 --- a/custom_components/tibber_prices/coordinator/constants.py +++ b/custom_components/tibber_prices/coordinator/constants.py @@ -84,7 +84,11 @@ TIME_SENSITIVE_ENTITY_KEYS = frozenset( "best_price_next_start_time", "peak_price_end_time", "peak_price_next_start_time", - # Lifecycle sensor (needs quarter-hour updates for turnover_pending detection at 23:45) + # Lifecycle sensor needs quarter-hour precision for state transitions: + # - 23:45: turnover_pending (last interval before midnight) + # - 00:00: turnover complete (after midnight API update) + # - 13:00: searching_tomorrow (when tomorrow data search begins) + # Uses state-change filter in _handle_time_sensitive_update() to prevent recorder spam "data_lifecycle_status", } ) diff --git a/custom_components/tibber_prices/coordinator/core.py b/custom_components/tibber_prices/coordinator/core.py index 2e9462c..486164a 100644 --- a/custom_components/tibber_prices/coordinator/core.py +++ b/custom_components/tibber_prices/coordinator/core.py @@ -11,7 +11,6 @@ from homeassistant.helpers.storage import Store from homeassistant.helpers.update_coordinator import DataUpdateCoordinator if TYPE_CHECKING: - from collections.abc import Callable from datetime import date, datetime from homeassistant.config_entries import ConfigEntry @@ -242,16 +241,19 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): self._last_user_update: datetime | None = None self._user_update_interval = timedelta(days=1) - # Data lifecycle tracking for diagnostic sensor + # Data lifecycle tracking + # Note: _lifecycle_state is used for DIAGNOSTICS only (diagnostics.py export). + # The lifecycle SENSOR calculates its state dynamically in get_lifecycle_state(), + # using: _is_fetching, last_exception, time calculations, _needs_tomorrow_data(), + # and _last_price_update. It does NOT read _lifecycle_state! self._lifecycle_state: str = ( - "cached" # Current state: cached, fresh, refreshing, searching_tomorrow, turnover_pending, error + "cached" # For diagnostics: cached, fresh, refreshing, searching_tomorrow, turnover_pending, error ) - self._last_price_update: datetime | None = None # Tracks when price data was last fetched (for cache_age) + self._last_price_update: datetime | None = None # When price data was last fetched from API self._api_calls_today: int = 0 # Counter for API calls today self._last_api_call_date: date | None = None # Date of last API call (for daily reset) - self._is_fetching: bool = False # Flag to track active API fetch + self._is_fetching: bool = False # Flag to track active API fetch (read by lifecycle sensor) self._last_coordinator_update: datetime | None = None # When Timer #1 last ran (_async_update_data) - self._lifecycle_callbacks: list[Callable[[], None]] = [] # Push-update callbacks for lifecycle sensor # Start timers self._listener_manager.schedule_quarter_hour_refresh(self._handle_quarter_hour_refresh) @@ -550,8 +552,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): # Transition lifecycle state from "fresh" to "cached" if enough time passed # (5 minutes threshold defined in lifecycle calculator) - # Note: With Pool as source of truth, we track "fresh" state based on - # when data was last fetched from the API (tracked by _api_calls_today counter) + # Note: This updates _lifecycle_state for diagnostics only. + # The lifecycle sensor calculates its state dynamically in get_lifecycle_state(), + # checking _last_price_update timestamp directly. if self._lifecycle_state == "fresh": # After 5 minutes, data is considered "cached" (no longer "just fetched") self._lifecycle_state = "cached" @@ -601,9 +604,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): self._last_api_call_date = current_date # Set _is_fetching flag - lifecycle sensor shows "refreshing" during fetch + # Note: Lifecycle sensor reads this flag directly in get_lifecycle_state() self._is_fetching = True - # Immediately notify lifecycle sensor about state change - self.async_update_listeners() # Get current price info to check if tomorrow data already exists current_price_info = self.data.get("priceInfo", []) if self.data else [] @@ -631,6 +633,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): "API call completed: Fetched %d intervals, updating lifecycle to 'fresh'", len(result["priceInfo"]), ) + # Note: _lifecycle_state is for diagnostics only. + # Lifecycle sensor calculates state dynamically from _last_price_update. elif not api_called: # Using cached data - lifecycle stays as is (cached/searching_tomorrow/etc.) _LOGGER.debug( @@ -644,7 +648,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): ) as err: # Reset lifecycle state on error self._is_fetching = False - self._lifecycle_state = "error" + self._lifecycle_state = "error" # For diagnostics + # Note: Lifecycle sensor detects errors via coordinator.last_exception # Track rate limit errors for repair system await self._track_rate_limit_error(err) diff --git a/custom_components/tibber_prices/sensor/attributes/lifecycle.py b/custom_components/tibber_prices/sensor/attributes/lifecycle.py index 36e4b48..fb524a5 100644 --- a/custom_components/tibber_prices/sensor/attributes/lifecycle.py +++ b/custom_components/tibber_prices/sensor/attributes/lifecycle.py @@ -1,4 +1,24 @@ -"""Attribute builders for lifecycle diagnostic sensor.""" +""" +Attribute builders for lifecycle diagnostic sensor. + +This sensor uses event-based updates with state-change filtering to minimize +recorder entries. Only attributes that are relevant to the lifecycle STATE +are included here - attributes that change independently of state belong +in a separate sensor or diagnostics. + +Included attributes (update only on state change): +- tomorrow_available: Whether tomorrow's price data is available +- next_api_poll: When the next API poll will occur (builds user trust) +- updates_today: Number of API calls made today +- last_turnover: When the last midnight turnover occurred +- last_error: Details of the last error (if any) + +Pool statistics (sensor_intervals_count, cache_fill_percent, etc.) are +intentionally NOT included here because they change independently of +the lifecycle state. With state-change filtering, these would become +stale. Pool statistics are available via diagnostics or could be +exposed as a separate sensor if needed. +""" from __future__ import annotations @@ -13,11 +33,6 @@ if TYPE_CHECKING: ) -# Constants for fetch age formatting -MINUTES_PER_HOUR = 60 -MINUTES_PER_DAY = 1440 # 24 * 60 - - def build_lifecycle_attributes( coordinator: TibberPricesDataUpdateCoordinator, lifecycle_calculator: TibberPricesLifecycleCalculator, @@ -25,8 +40,11 @@ def build_lifecycle_attributes( """ Build attributes for data_lifecycle_status sensor. - Shows comprehensive pool status, data availability, and update timing. - Separates sensor-related stats from cache stats for clarity. + Event-based updates with state-change filtering - attributes only update + when the lifecycle STATE changes (fresh→cached, cached→turnover_pending, etc.). + + Only includes attributes that are directly relevant to the lifecycle state. + Pool statistics are intentionally excluded to avoid stale data. Returns: Dict with lifecycle attributes @@ -34,75 +52,31 @@ def build_lifecycle_attributes( """ attributes: dict[str, Any] = {} - # === Pool Statistics (source of truth for cached data) === - pool_stats = lifecycle_calculator.get_pool_stats() - if pool_stats: - # --- Sensor Intervals (Protected Range: gestern bis übermorgen) --- - attributes["sensor_intervals_count"] = pool_stats.get("sensor_intervals_count", 0) - attributes["sensor_intervals_expected"] = pool_stats.get("sensor_intervals_expected", 384) - attributes["sensor_intervals_has_gaps"] = pool_stats.get("sensor_intervals_has_gaps", True) - - # --- Cache Statistics (Entire Pool) --- - attributes["cache_intervals_total"] = pool_stats.get("cache_intervals_total", 0) - attributes["cache_intervals_limit"] = pool_stats.get("cache_intervals_limit", 960) - attributes["cache_fill_percent"] = pool_stats.get("cache_fill_percent", 0) - attributes["cache_intervals_extra"] = pool_stats.get("cache_intervals_extra", 0) - - # --- Timestamps --- - last_sensor_fetch = pool_stats.get("last_sensor_fetch") - if last_sensor_fetch: - attributes["last_sensor_fetch"] = last_sensor_fetch - - oldest_interval = pool_stats.get("cache_oldest_interval") - if oldest_interval: - attributes["cache_oldest_interval"] = oldest_interval - - newest_interval = pool_stats.get("cache_newest_interval") - if newest_interval: - attributes["cache_newest_interval"] = newest_interval - - # --- API Fetch Groups (internal tracking) --- - attributes["fetch_groups_count"] = pool_stats.get("fetch_groups_count", 0) - - # === Sensor Fetch Age (human-readable) === - fetch_age = lifecycle_calculator.get_sensor_fetch_age_minutes() - if fetch_age is not None: - # Format fetch age with units for better readability - if fetch_age < MINUTES_PER_HOUR: - attributes["sensor_fetch_age"] = f"{fetch_age} min" - elif fetch_age < MINUTES_PER_DAY: # Less than 24 hours - hours = fetch_age // MINUTES_PER_HOUR - minutes = fetch_age % MINUTES_PER_HOUR - attributes["sensor_fetch_age"] = f"{hours}h {minutes}min" if minutes > 0 else f"{hours}h" - else: # 24+ hours - days = fetch_age // MINUTES_PER_DAY - hours = (fetch_age % MINUTES_PER_DAY) // MINUTES_PER_HOUR - attributes["sensor_fetch_age"] = f"{days}d {hours}h" if hours > 0 else f"{days}d" - - # Keep raw value for automations - attributes["sensor_fetch_age_minutes"] = fetch_age - # === Tomorrow Data Status === + # Critical for understanding lifecycle state transitions attributes["tomorrow_available"] = lifecycle_calculator.has_tomorrow_data() - attributes["tomorrow_expected_after"] = "13:00" - # === Next Actions === + # === Next API Poll Time === + # Builds user trust: shows when the integration will check for tomorrow data + # - Before 13:00: Shows today 13:00 (when tomorrow-search begins) + # - After 13:00 without tomorrow data: Shows next Timer #1 execution (active polling) + # - After 13:00 with tomorrow data: Shows tomorrow 13:00 (predictive) next_poll = lifecycle_calculator.get_next_api_poll_time() - if next_poll: # None means data is complete, no more polls needed + if next_poll: attributes["next_api_poll"] = next_poll.isoformat() - next_midnight = lifecycle_calculator.get_next_midnight_turnover_time() - attributes["next_midnight_turnover"] = next_midnight.isoformat() - # === Update Statistics === + # Shows API activity - resets at midnight with turnover api_calls = lifecycle_calculator.get_api_calls_today() attributes["updates_today"] = api_calls # === Midnight Turnover Info === - if coordinator._midnight_handler.last_turnover_time: # noqa: SLF001 - Internal state access for diagnostic display + # When was the last successful data rotation + if coordinator._midnight_handler.last_turnover_time: # noqa: SLF001 attributes["last_turnover"] = coordinator._midnight_handler.last_turnover_time.isoformat() # noqa: SLF001 # === Error Status === + # Present only when there's an active error if coordinator.last_exception: attributes["last_error"] = str(coordinator.last_exception) diff --git a/custom_components/tibber_prices/sensor/calculators/lifecycle.py b/custom_components/tibber_prices/sensor/calculators/lifecycle.py index 072433b..bb11d9a 100644 --- a/custom_components/tibber_prices/sensor/calculators/lifecycle.py +++ b/custom_components/tibber_prices/sensor/calculators/lifecycle.py @@ -3,7 +3,6 @@ from __future__ import annotations from datetime import datetime, timedelta -from typing import Any from custom_components.tibber_prices.coordinator.constants import UPDATE_INTERVAL @@ -14,10 +13,6 @@ FRESH_DATA_THRESHOLD_MINUTES = 5 # Data is "fresh" within 5 minutes of API fetc TOMORROW_CHECK_HOUR = 13 # After 13:00, we actively check for tomorrow data TURNOVER_WARNING_SECONDS = 900 # Warn 15 minutes before midnight (last quarter-hour: 23:45-00:00) -# Constants for 15-minute update boundaries (Timer #1) -QUARTER_HOUR_BOUNDARIES = [0, 15, 30, 45] # Minutes when Timer #1 can trigger -LAST_HOUR_OF_DAY = 23 - class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): """Calculate data lifecycle status and metadata.""" @@ -79,28 +74,6 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): # Priority 6: Default - using cached data return "cached" - def get_sensor_fetch_age_minutes(self) -> int | None: - """ - Calculate how many minutes ago sensor data was last fetched. - - Uses the Pool's last_sensor_fetch as the source of truth. - This only counts API fetches for sensor data (protected range), - not service-triggered fetches for chart data. - - Returns: - Minutes since last sensor fetch, or None if no fetch recorded. - - """ - pool_stats = self._get_pool_stats() - if not pool_stats or not pool_stats.get("last_sensor_fetch"): - return None - - last_fetch = pool_stats["last_sensor_fetch"] - # Parse ISO timestamp - last_fetch_dt = datetime.fromisoformat(last_fetch) - age = self.coordinator.time.now() - last_fetch_dt - return int(age.total_seconds() / 60) - def get_next_api_poll_time(self) -> datetime | None: """ Calculate when the next API poll attempt will occur. @@ -189,15 +162,6 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): # Fallback: If we don't know timer offset yet, assume 13:00:00 return tomorrow_13 - def get_next_midnight_turnover_time(self) -> datetime: - """Calculate when the next midnight turnover will occur.""" - coordinator = self.coordinator - current_time = coordinator.time.now() - now_local = coordinator.time.as_local(current_time) - - # Next midnight - return now_local.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) - def get_api_calls_today(self) -> int: """Get the number of API calls made today.""" coordinator = self.coordinator @@ -218,47 +182,3 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator): """ return not self.coordinator._needs_tomorrow_data() # noqa: SLF001 - - def get_pool_stats(self) -> dict[str, Any] | None: - """ - Get interval pool statistics. - - Returns: - Dict with pool stats or None if pool not available. - Contains: - - Sensor intervals (protected range): - - sensor_intervals_count: Intervals in protected range - - sensor_intervals_expected: Expected count (usually 384) - - sensor_intervals_has_gaps: True if gaps exist - - Cache statistics: - - cache_intervals_total: Total intervals in cache - - cache_intervals_limit: Maximum cache size - - cache_fill_percent: How full the cache is (%) - - cache_intervals_extra: Intervals outside protected range - - Timestamps: - - last_sensor_fetch: When sensor data was last fetched - - cache_oldest_interval: Oldest interval in cache - - cache_newest_interval: Newest interval in cache - - Metadata: - - fetch_groups_count: Number of API fetch batches stored - - """ - return self._get_pool_stats() - - def _get_pool_stats(self) -> dict[str, Any] | None: - """ - Get pool stats from coordinator. - - Returns: - Pool statistics dict or None. - - """ - coordinator = self.coordinator - # Access the pool via the price data manager - if hasattr(coordinator, "_price_data_manager"): - price_data_manager = coordinator._price_data_manager # noqa: SLF001 - if hasattr(price_data_manager, "_interval_pool"): - pool = price_data_manager._interval_pool # noqa: SLF001 - if pool is not None: - return pool.get_pool_stats() - return None diff --git a/custom_components/tibber_prices/sensor/core.py b/custom_components/tibber_prices/sensor/core.py index 5a6e4ba..8e953bf 100644 --- a/custom_components/tibber_prices/sensor/core.py +++ b/custom_components/tibber_prices/sensor/core.py @@ -177,6 +177,9 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor): self._value_getter: Callable | None = self._get_value_getter() self._time_sensitive_remove_listener: Callable | None = None self._minute_update_remove_listener: Callable | None = None + # Lifecycle sensor state change detection (for recorder optimization) + # Store as Any because native_value can be str/float/datetime depending on sensor type + self._last_lifecycle_state: Any = None # Chart data export (for chart_data_export sensor) - from binary_sensor self._chart_data_last_update = None # Track last service call timestamp self._chart_data_error = None # Track last service call error @@ -312,7 +315,18 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor): # Clear trend calculation cache for trend sensors elif self.entity_description.key in ("current_price_trend", "next_price_trend_change"): self._trend_calculator.clear_calculation_cache() - self.async_write_ha_state() + + # For lifecycle sensor: Only write state if it actually changed (state-change filter) + # This enables precise detection at quarter-hour boundaries (23:45 turnover_pending, + # 13:00 searching_tomorrow, 00:00 turnover complete) without recorder spam + if self.entity_description.key == "data_lifecycle_status": + current_state = self.native_value + if current_state != self._last_lifecycle_state: + self._last_lifecycle_state = current_state + self.async_write_ha_state() + # If state didn't change, skip write to recorder + else: + self.async_write_ha_state() @callback def _handle_minute_update(self, time_service: TibberPricesTimeService) -> None: @@ -347,7 +361,16 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor): # Schedule async refresh as a task (we're in a callback) self.hass.async_create_task(self._refresh_chart_metadata()) - super()._handle_coordinator_update() + # For lifecycle sensor: Only write state if it actually changed (event-based filter) + # Prevents excessive recorder entries while keeping quarter-hour update capability + if self.entity_description.key == "data_lifecycle_status": + current_state = self.native_value + if current_state != self._last_lifecycle_state: + self._last_lifecycle_state = current_state + super()._handle_coordinator_update() + # If state didn't change, skip write to recorder + else: + super()._handle_coordinator_update() def _get_value_getter(self) -> Callable | None: """Return the appropriate value getter method based on the sensor type.""" diff --git a/tests/test_cache_age.py b/tests/test_cache_age.py deleted file mode 100644 index 79be0cd..0000000 --- a/tests/test_cache_age.py +++ /dev/null @@ -1,202 +0,0 @@ -""" -Unit tests for sensor fetch age calculation. - -Tests the get_sensor_fetch_age_minutes() method which calculates how old -the sensor data is in minutes (based on last API fetch for sensor intervals). -""" - -from __future__ import annotations - -from datetime import datetime, timedelta -from unittest.mock import Mock -from zoneinfo import ZoneInfo - -import pytest - -from custom_components.tibber_prices.sensor.calculators.lifecycle import ( - TibberPricesLifecycleCalculator, -) - - -def _create_mock_coordinator_with_pool( - current_time: datetime, - last_sensor_fetch: datetime | None, -) -> Mock: - """Create a mock coordinator with pool stats configured.""" - coordinator = Mock() - coordinator.time = Mock() - coordinator.time.now.return_value = current_time - - # Mock the pool stats access path - mock_pool = Mock() - if last_sensor_fetch is not None: - mock_pool.get_pool_stats.return_value = { - # Sensor intervals (protected range) - "sensor_intervals_count": 384, - "sensor_intervals_expected": 384, - "sensor_intervals_has_gaps": False, - # Cache statistics - "cache_intervals_total": 384, - "cache_intervals_limit": 960, - "cache_fill_percent": 40.0, - "cache_intervals_extra": 0, - # Timestamps - "last_sensor_fetch": last_sensor_fetch.isoformat(), - "cache_oldest_interval": "2025-11-20T00:00:00", - "cache_newest_interval": "2025-11-23T23:45:00", - # Metadata - "fetch_groups_count": 1, - } - else: - mock_pool.get_pool_stats.return_value = { - # Sensor intervals (protected range) - "sensor_intervals_count": 0, - "sensor_intervals_expected": 384, - "sensor_intervals_has_gaps": True, - # Cache statistics - "cache_intervals_total": 0, - "cache_intervals_limit": 960, - "cache_fill_percent": 0, - "cache_intervals_extra": 0, - # Timestamps - "last_sensor_fetch": None, - "cache_oldest_interval": None, - "cache_newest_interval": None, - # Metadata - "fetch_groups_count": 0, - } - - mock_price_data_manager = Mock() - mock_price_data_manager._interval_pool = mock_pool # noqa: SLF001 - - coordinator._price_data_manager = mock_price_data_manager # noqa: SLF001 - - return coordinator - - -@pytest.mark.unit -def test_sensor_fetch_age_no_update() -> None: - """ - Test sensor fetch age is None when no updates have occurred. - - Scenario: Integration just started, no data fetched yet - Expected: Fetch age is None - """ - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - coordinator = _create_mock_coordinator_with_pool(current_time, None) - - calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_sensor_fetch_age_minutes() - - assert age is None - - -@pytest.mark.unit -def test_sensor_fetch_age_recent() -> None: - """ - Test sensor fetch age for recent data. - - Scenario: Last update was 5 minutes ago - Expected: Fetch age is 5 minutes - """ - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_fetch = current_time - timedelta(minutes=5) - coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) - - calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_sensor_fetch_age_minutes() - - assert age == 5 - - -@pytest.mark.unit -def test_sensor_fetch_age_old() -> None: - """ - Test sensor fetch age for older data. - - Scenario: Last update was 90 minutes ago (6 update cycles missed) - Expected: Fetch age is 90 minutes - """ - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_fetch = current_time - timedelta(minutes=90) - coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) - - calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_sensor_fetch_age_minutes() - - assert age == 90 - - -@pytest.mark.unit -def test_sensor_fetch_age_exact_minute() -> None: - """ - Test sensor fetch age calculation rounds down to minutes. - - Scenario: Last update was 5 minutes and 45 seconds ago - Expected: Fetch age is 5 minutes (int conversion truncates) - """ - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_fetch = current_time - timedelta(minutes=5, seconds=45) - coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) - - calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_sensor_fetch_age_minutes() - - # int() truncates: 5.75 minutes → 5 - assert age == 5 - - -@pytest.mark.unit -def test_sensor_fetch_age_zero_fresh_data() -> None: - """ - Test sensor fetch age is 0 for brand new data. - - Scenario: Last update was just now (< 60 seconds ago) - Expected: Fetch age is 0 minutes - """ - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_fetch = current_time - timedelta(seconds=30) - coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) - - calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_sensor_fetch_age_minutes() - - assert age == 0 - - -@pytest.mark.unit -def test_sensor_fetch_age_multiple_hours() -> None: - """ - Test sensor fetch age for very old data (multiple hours). - - Scenario: Last update was 3 hours ago (180 minutes) - Expected: Fetch age is 180 minutes - - This could happen if API was down or integration was stopped. - """ - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_fetch = current_time - timedelta(hours=3) - coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) - - calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_sensor_fetch_age_minutes() - - assert age == 180 - - -@pytest.mark.unit -def test_sensor_fetch_age_boundary_60_seconds() -> None: - """ - Test sensor fetch age exactly at 60 seconds (1 minute boundary). - - Scenario: Last update was exactly 60 seconds ago - Expected: Fetch age is 1 minute - """ - current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) - last_fetch = current_time - timedelta(seconds=60) - coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch) - - calculator = TibberPricesLifecycleCalculator(coordinator) - age = calculator.get_sensor_fetch_age_minutes() - - assert age == 1 diff --git a/tests/test_sensor_timer_assignment.py b/tests/test_sensor_timer_assignment.py index 8c4cb65..6b11710 100644 --- a/tests/test_sensor_timer_assignment.py +++ b/tests/test_sensor_timer_assignment.py @@ -257,17 +257,27 @@ def test_timing_sensors_use_minute_timer() -> None: ) -def test_lifecycle_sensor_uses_quarter_hour_timer() -> None: +def test_lifecycle_sensor_uses_quarter_hour_timer_with_state_filter() -> None: """ - Test that data lifecycle status sensor uses Timer #2. + Test that data lifecycle status sensor uses Timer #2 WITH state-change filtering. - The lifecycle sensor needs quarter-hour updates to detect: - - Turnover pending at 23:45 (quarter-hour boundary) - - Turnover completed after midnight API update + The lifecycle sensor needs quarter-hour precision for detecting: + - 23:45: turnover_pending (last interval before midnight) + - 00:00: turnover complete (after midnight API update) + - 13:00: searching_tomorrow (when tomorrow data search begins) + + To prevent recorder spam, it uses state-change filtering in both: + - _handle_coordinator_update() (Timer #1) + - _handle_time_sensitive_update() (Timer #2) + + State is only written to recorder if it actually changed. + This reduces recorder entries from ~96/day to ~10-15/day. """ + # Lifecycle sensor MUST be in TIME_SENSITIVE_ENTITY_KEYS for quarter-hour precision assert "data_lifecycle_status" in TIME_SENSITIVE_ENTITY_KEYS, ( - "Lifecycle sensor needs quarter-hour updates to detect turnover_pending\n" - "at 23:45 (last interval before midnight)" + "Lifecycle sensor needs quarter-hour updates for precise state transitions\n" + "at 23:45 (turnover_pending), 00:00 (turnover complete), 13:00 (searching_tomorrow).\n" + "State-change filter in _handle_time_sensitive_update() prevents recorder spam." )