fix(sensor): streamline lifecycle attrs and next poll visibility

- Remove pool stats/fetch-age from lifecycle sensor to avoid stale data under state-change filtering; add `next_api_poll` for transparency.
- Clean lifecycle calculator by dropping unused helpers/constants and delete the obsolete cache age test.
- Clarify lifecycle state is diagnostics-only in coordinator comments, keep state-change filtering in timer test, and retain quarter-hour precision notes in constants.
- Keep sensor core aligned with lifecycle state filtering.

Impact: Lifecycle sensor now exposes only state-relevant fields without recorder noise, next API poll is visible, and dead code/tests tied to removed attributes are gone.
This commit is contained in:
Julian Pawlowski 2025-12-26 12:13:36 +00:00
parent 665fac10fc
commit 09a50dccff
7 changed files with 100 additions and 366 deletions

View file

@ -84,7 +84,11 @@ TIME_SENSITIVE_ENTITY_KEYS = frozenset(
"best_price_next_start_time",
"peak_price_end_time",
"peak_price_next_start_time",
# Lifecycle sensor (needs quarter-hour updates for turnover_pending detection at 23:45)
# Lifecycle sensor needs quarter-hour precision for state transitions:
# - 23:45: turnover_pending (last interval before midnight)
# - 00:00: turnover complete (after midnight API update)
# - 13:00: searching_tomorrow (when tomorrow data search begins)
# Uses state-change filter in _handle_time_sensitive_update() to prevent recorder spam
"data_lifecycle_status",
}
)

View file

@ -11,7 +11,6 @@ from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
if TYPE_CHECKING:
from collections.abc import Callable
from datetime import date, datetime
from homeassistant.config_entries import ConfigEntry
@ -242,16 +241,19 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self._last_user_update: datetime | None = None
self._user_update_interval = timedelta(days=1)
# Data lifecycle tracking for diagnostic sensor
# Data lifecycle tracking
# Note: _lifecycle_state is used for DIAGNOSTICS only (diagnostics.py export).
# The lifecycle SENSOR calculates its state dynamically in get_lifecycle_state(),
# using: _is_fetching, last_exception, time calculations, _needs_tomorrow_data(),
# and _last_price_update. It does NOT read _lifecycle_state!
self._lifecycle_state: str = (
"cached" # Current state: cached, fresh, refreshing, searching_tomorrow, turnover_pending, error
"cached" # For diagnostics: cached, fresh, refreshing, searching_tomorrow, turnover_pending, error
)
self._last_price_update: datetime | None = None # Tracks when price data was last fetched (for cache_age)
self._last_price_update: datetime | None = None # When price data was last fetched from API
self._api_calls_today: int = 0 # Counter for API calls today
self._last_api_call_date: date | None = None # Date of last API call (for daily reset)
self._is_fetching: bool = False # Flag to track active API fetch
self._is_fetching: bool = False # Flag to track active API fetch (read by lifecycle sensor)
self._last_coordinator_update: datetime | None = None # When Timer #1 last ran (_async_update_data)
self._lifecycle_callbacks: list[Callable[[], None]] = [] # Push-update callbacks for lifecycle sensor
# Start timers
self._listener_manager.schedule_quarter_hour_refresh(self._handle_quarter_hour_refresh)
@ -550,8 +552,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Transition lifecycle state from "fresh" to "cached" if enough time passed
# (5 minutes threshold defined in lifecycle calculator)
# Note: With Pool as source of truth, we track "fresh" state based on
# when data was last fetched from the API (tracked by _api_calls_today counter)
# Note: This updates _lifecycle_state for diagnostics only.
# The lifecycle sensor calculates its state dynamically in get_lifecycle_state(),
# checking _last_price_update timestamp directly.
if self._lifecycle_state == "fresh":
# After 5 minutes, data is considered "cached" (no longer "just fetched")
self._lifecycle_state = "cached"
@ -601,9 +604,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self._last_api_call_date = current_date
# Set _is_fetching flag - lifecycle sensor shows "refreshing" during fetch
# Note: Lifecycle sensor reads this flag directly in get_lifecycle_state()
self._is_fetching = True
# Immediately notify lifecycle sensor about state change
self.async_update_listeners()
# Get current price info to check if tomorrow data already exists
current_price_info = self.data.get("priceInfo", []) if self.data else []
@ -631,6 +633,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"API call completed: Fetched %d intervals, updating lifecycle to 'fresh'",
len(result["priceInfo"]),
)
# Note: _lifecycle_state is for diagnostics only.
# Lifecycle sensor calculates state dynamically from _last_price_update.
elif not api_called:
# Using cached data - lifecycle stays as is (cached/searching_tomorrow/etc.)
_LOGGER.debug(
@ -644,7 +648,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
) as err:
# Reset lifecycle state on error
self._is_fetching = False
self._lifecycle_state = "error"
self._lifecycle_state = "error" # For diagnostics
# Note: Lifecycle sensor detects errors via coordinator.last_exception
# Track rate limit errors for repair system
await self._track_rate_limit_error(err)

View file

@ -1,4 +1,24 @@
"""Attribute builders for lifecycle diagnostic sensor."""
"""
Attribute builders for lifecycle diagnostic sensor.
This sensor uses event-based updates with state-change filtering to minimize
recorder entries. Only attributes that are relevant to the lifecycle STATE
are included here - attributes that change independently of state belong
in a separate sensor or diagnostics.
Included attributes (update only on state change):
- tomorrow_available: Whether tomorrow's price data is available
- next_api_poll: When the next API poll will occur (builds user trust)
- updates_today: Number of API calls made today
- last_turnover: When the last midnight turnover occurred
- last_error: Details of the last error (if any)
Pool statistics (sensor_intervals_count, cache_fill_percent, etc.) are
intentionally NOT included here because they change independently of
the lifecycle state. With state-change filtering, these would become
stale. Pool statistics are available via diagnostics or could be
exposed as a separate sensor if needed.
"""
from __future__ import annotations
@ -13,11 +33,6 @@ if TYPE_CHECKING:
)
# Constants for fetch age formatting
MINUTES_PER_HOUR = 60
MINUTES_PER_DAY = 1440 # 24 * 60
def build_lifecycle_attributes(
coordinator: TibberPricesDataUpdateCoordinator,
lifecycle_calculator: TibberPricesLifecycleCalculator,
@ -25,8 +40,11 @@ def build_lifecycle_attributes(
"""
Build attributes for data_lifecycle_status sensor.
Shows comprehensive pool status, data availability, and update timing.
Separates sensor-related stats from cache stats for clarity.
Event-based updates with state-change filtering - attributes only update
when the lifecycle STATE changes (freshcached, cachedturnover_pending, etc.).
Only includes attributes that are directly relevant to the lifecycle state.
Pool statistics are intentionally excluded to avoid stale data.
Returns:
Dict with lifecycle attributes
@ -34,75 +52,31 @@ def build_lifecycle_attributes(
"""
attributes: dict[str, Any] = {}
# === Pool Statistics (source of truth for cached data) ===
pool_stats = lifecycle_calculator.get_pool_stats()
if pool_stats:
# --- Sensor Intervals (Protected Range: gestern bis übermorgen) ---
attributes["sensor_intervals_count"] = pool_stats.get("sensor_intervals_count", 0)
attributes["sensor_intervals_expected"] = pool_stats.get("sensor_intervals_expected", 384)
attributes["sensor_intervals_has_gaps"] = pool_stats.get("sensor_intervals_has_gaps", True)
# --- Cache Statistics (Entire Pool) ---
attributes["cache_intervals_total"] = pool_stats.get("cache_intervals_total", 0)
attributes["cache_intervals_limit"] = pool_stats.get("cache_intervals_limit", 960)
attributes["cache_fill_percent"] = pool_stats.get("cache_fill_percent", 0)
attributes["cache_intervals_extra"] = pool_stats.get("cache_intervals_extra", 0)
# --- Timestamps ---
last_sensor_fetch = pool_stats.get("last_sensor_fetch")
if last_sensor_fetch:
attributes["last_sensor_fetch"] = last_sensor_fetch
oldest_interval = pool_stats.get("cache_oldest_interval")
if oldest_interval:
attributes["cache_oldest_interval"] = oldest_interval
newest_interval = pool_stats.get("cache_newest_interval")
if newest_interval:
attributes["cache_newest_interval"] = newest_interval
# --- API Fetch Groups (internal tracking) ---
attributes["fetch_groups_count"] = pool_stats.get("fetch_groups_count", 0)
# === Sensor Fetch Age (human-readable) ===
fetch_age = lifecycle_calculator.get_sensor_fetch_age_minutes()
if fetch_age is not None:
# Format fetch age with units for better readability
if fetch_age < MINUTES_PER_HOUR:
attributes["sensor_fetch_age"] = f"{fetch_age} min"
elif fetch_age < MINUTES_PER_DAY: # Less than 24 hours
hours = fetch_age // MINUTES_PER_HOUR
minutes = fetch_age % MINUTES_PER_HOUR
attributes["sensor_fetch_age"] = f"{hours}h {minutes}min" if minutes > 0 else f"{hours}h"
else: # 24+ hours
days = fetch_age // MINUTES_PER_DAY
hours = (fetch_age % MINUTES_PER_DAY) // MINUTES_PER_HOUR
attributes["sensor_fetch_age"] = f"{days}d {hours}h" if hours > 0 else f"{days}d"
# Keep raw value for automations
attributes["sensor_fetch_age_minutes"] = fetch_age
# === Tomorrow Data Status ===
# Critical for understanding lifecycle state transitions
attributes["tomorrow_available"] = lifecycle_calculator.has_tomorrow_data()
attributes["tomorrow_expected_after"] = "13:00"
# === Next Actions ===
# === Next API Poll Time ===
# Builds user trust: shows when the integration will check for tomorrow data
# - Before 13:00: Shows today 13:00 (when tomorrow-search begins)
# - After 13:00 without tomorrow data: Shows next Timer #1 execution (active polling)
# - After 13:00 with tomorrow data: Shows tomorrow 13:00 (predictive)
next_poll = lifecycle_calculator.get_next_api_poll_time()
if next_poll: # None means data is complete, no more polls needed
if next_poll:
attributes["next_api_poll"] = next_poll.isoformat()
next_midnight = lifecycle_calculator.get_next_midnight_turnover_time()
attributes["next_midnight_turnover"] = next_midnight.isoformat()
# === Update Statistics ===
# Shows API activity - resets at midnight with turnover
api_calls = lifecycle_calculator.get_api_calls_today()
attributes["updates_today"] = api_calls
# === Midnight Turnover Info ===
if coordinator._midnight_handler.last_turnover_time: # noqa: SLF001 - Internal state access for diagnostic display
# When was the last successful data rotation
if coordinator._midnight_handler.last_turnover_time: # noqa: SLF001
attributes["last_turnover"] = coordinator._midnight_handler.last_turnover_time.isoformat() # noqa: SLF001
# === Error Status ===
# Present only when there's an active error
if coordinator.last_exception:
attributes["last_error"] = str(coordinator.last_exception)

View file

@ -3,7 +3,6 @@
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any
from custom_components.tibber_prices.coordinator.constants import UPDATE_INTERVAL
@ -14,10 +13,6 @@ FRESH_DATA_THRESHOLD_MINUTES = 5 # Data is "fresh" within 5 minutes of API fetc
TOMORROW_CHECK_HOUR = 13 # After 13:00, we actively check for tomorrow data
TURNOVER_WARNING_SECONDS = 900 # Warn 15 minutes before midnight (last quarter-hour: 23:45-00:00)
# Constants for 15-minute update boundaries (Timer #1)
QUARTER_HOUR_BOUNDARIES = [0, 15, 30, 45] # Minutes when Timer #1 can trigger
LAST_HOUR_OF_DAY = 23
class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
"""Calculate data lifecycle status and metadata."""
@ -79,28 +74,6 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
# Priority 6: Default - using cached data
return "cached"
def get_sensor_fetch_age_minutes(self) -> int | None:
"""
Calculate how many minutes ago sensor data was last fetched.
Uses the Pool's last_sensor_fetch as the source of truth.
This only counts API fetches for sensor data (protected range),
not service-triggered fetches for chart data.
Returns:
Minutes since last sensor fetch, or None if no fetch recorded.
"""
pool_stats = self._get_pool_stats()
if not pool_stats or not pool_stats.get("last_sensor_fetch"):
return None
last_fetch = pool_stats["last_sensor_fetch"]
# Parse ISO timestamp
last_fetch_dt = datetime.fromisoformat(last_fetch)
age = self.coordinator.time.now() - last_fetch_dt
return int(age.total_seconds() / 60)
def get_next_api_poll_time(self) -> datetime | None:
"""
Calculate when the next API poll attempt will occur.
@ -189,15 +162,6 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
# Fallback: If we don't know timer offset yet, assume 13:00:00
return tomorrow_13
def get_next_midnight_turnover_time(self) -> datetime:
"""Calculate when the next midnight turnover will occur."""
coordinator = self.coordinator
current_time = coordinator.time.now()
now_local = coordinator.time.as_local(current_time)
# Next midnight
return now_local.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
def get_api_calls_today(self) -> int:
"""Get the number of API calls made today."""
coordinator = self.coordinator
@ -218,47 +182,3 @@ class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
"""
return not self.coordinator._needs_tomorrow_data() # noqa: SLF001
def get_pool_stats(self) -> dict[str, Any] | None:
"""
Get interval pool statistics.
Returns:
Dict with pool stats or None if pool not available.
Contains:
- Sensor intervals (protected range):
- sensor_intervals_count: Intervals in protected range
- sensor_intervals_expected: Expected count (usually 384)
- sensor_intervals_has_gaps: True if gaps exist
- Cache statistics:
- cache_intervals_total: Total intervals in cache
- cache_intervals_limit: Maximum cache size
- cache_fill_percent: How full the cache is (%)
- cache_intervals_extra: Intervals outside protected range
- Timestamps:
- last_sensor_fetch: When sensor data was last fetched
- cache_oldest_interval: Oldest interval in cache
- cache_newest_interval: Newest interval in cache
- Metadata:
- fetch_groups_count: Number of API fetch batches stored
"""
return self._get_pool_stats()
def _get_pool_stats(self) -> dict[str, Any] | None:
"""
Get pool stats from coordinator.
Returns:
Pool statistics dict or None.
"""
coordinator = self.coordinator
# Access the pool via the price data manager
if hasattr(coordinator, "_price_data_manager"):
price_data_manager = coordinator._price_data_manager # noqa: SLF001
if hasattr(price_data_manager, "_interval_pool"):
pool = price_data_manager._interval_pool # noqa: SLF001
if pool is not None:
return pool.get_pool_stats()
return None

View file

@ -177,6 +177,9 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
self._value_getter: Callable | None = self._get_value_getter()
self._time_sensitive_remove_listener: Callable | None = None
self._minute_update_remove_listener: Callable | None = None
# Lifecycle sensor state change detection (for recorder optimization)
# Store as Any because native_value can be str/float/datetime depending on sensor type
self._last_lifecycle_state: Any = None
# Chart data export (for chart_data_export sensor) - from binary_sensor
self._chart_data_last_update = None # Track last service call timestamp
self._chart_data_error = None # Track last service call error
@ -312,7 +315,18 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
# Clear trend calculation cache for trend sensors
elif self.entity_description.key in ("current_price_trend", "next_price_trend_change"):
self._trend_calculator.clear_calculation_cache()
self.async_write_ha_state()
# For lifecycle sensor: Only write state if it actually changed (state-change filter)
# This enables precise detection at quarter-hour boundaries (23:45 turnover_pending,
# 13:00 searching_tomorrow, 00:00 turnover complete) without recorder spam
if self.entity_description.key == "data_lifecycle_status":
current_state = self.native_value
if current_state != self._last_lifecycle_state:
self._last_lifecycle_state = current_state
self.async_write_ha_state()
# If state didn't change, skip write to recorder
else:
self.async_write_ha_state()
@callback
def _handle_minute_update(self, time_service: TibberPricesTimeService) -> None:
@ -347,7 +361,16 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
# Schedule async refresh as a task (we're in a callback)
self.hass.async_create_task(self._refresh_chart_metadata())
super()._handle_coordinator_update()
# For lifecycle sensor: Only write state if it actually changed (event-based filter)
# Prevents excessive recorder entries while keeping quarter-hour update capability
if self.entity_description.key == "data_lifecycle_status":
current_state = self.native_value
if current_state != self._last_lifecycle_state:
self._last_lifecycle_state = current_state
super()._handle_coordinator_update()
# If state didn't change, skip write to recorder
else:
super()._handle_coordinator_update()
def _get_value_getter(self) -> Callable | None:
"""Return the appropriate value getter method based on the sensor type."""

View file

@ -1,202 +0,0 @@
"""
Unit tests for sensor fetch age calculation.
Tests the get_sensor_fetch_age_minutes() method which calculates how old
the sensor data is in minutes (based on last API fetch for sensor intervals).
"""
from __future__ import annotations
from datetime import datetime, timedelta
from unittest.mock import Mock
from zoneinfo import ZoneInfo
import pytest
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
TibberPricesLifecycleCalculator,
)
def _create_mock_coordinator_with_pool(
current_time: datetime,
last_sensor_fetch: datetime | None,
) -> Mock:
"""Create a mock coordinator with pool stats configured."""
coordinator = Mock()
coordinator.time = Mock()
coordinator.time.now.return_value = current_time
# Mock the pool stats access path
mock_pool = Mock()
if last_sensor_fetch is not None:
mock_pool.get_pool_stats.return_value = {
# Sensor intervals (protected range)
"sensor_intervals_count": 384,
"sensor_intervals_expected": 384,
"sensor_intervals_has_gaps": False,
# Cache statistics
"cache_intervals_total": 384,
"cache_intervals_limit": 960,
"cache_fill_percent": 40.0,
"cache_intervals_extra": 0,
# Timestamps
"last_sensor_fetch": last_sensor_fetch.isoformat(),
"cache_oldest_interval": "2025-11-20T00:00:00",
"cache_newest_interval": "2025-11-23T23:45:00",
# Metadata
"fetch_groups_count": 1,
}
else:
mock_pool.get_pool_stats.return_value = {
# Sensor intervals (protected range)
"sensor_intervals_count": 0,
"sensor_intervals_expected": 384,
"sensor_intervals_has_gaps": True,
# Cache statistics
"cache_intervals_total": 0,
"cache_intervals_limit": 960,
"cache_fill_percent": 0,
"cache_intervals_extra": 0,
# Timestamps
"last_sensor_fetch": None,
"cache_oldest_interval": None,
"cache_newest_interval": None,
# Metadata
"fetch_groups_count": 0,
}
mock_price_data_manager = Mock()
mock_price_data_manager._interval_pool = mock_pool # noqa: SLF001
coordinator._price_data_manager = mock_price_data_manager # noqa: SLF001
return coordinator
@pytest.mark.unit
def test_sensor_fetch_age_no_update() -> None:
"""
Test sensor fetch age is None when no updates have occurred.
Scenario: Integration just started, no data fetched yet
Expected: Fetch age is None
"""
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator = _create_mock_coordinator_with_pool(current_time, None)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_sensor_fetch_age_minutes()
assert age is None
@pytest.mark.unit
def test_sensor_fetch_age_recent() -> None:
"""
Test sensor fetch age for recent data.
Scenario: Last update was 5 minutes ago
Expected: Fetch age is 5 minutes
"""
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_fetch = current_time - timedelta(minutes=5)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_sensor_fetch_age_minutes()
assert age == 5
@pytest.mark.unit
def test_sensor_fetch_age_old() -> None:
"""
Test sensor fetch age for older data.
Scenario: Last update was 90 minutes ago (6 update cycles missed)
Expected: Fetch age is 90 minutes
"""
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_fetch = current_time - timedelta(minutes=90)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_sensor_fetch_age_minutes()
assert age == 90
@pytest.mark.unit
def test_sensor_fetch_age_exact_minute() -> None:
"""
Test sensor fetch age calculation rounds down to minutes.
Scenario: Last update was 5 minutes and 45 seconds ago
Expected: Fetch age is 5 minutes (int conversion truncates)
"""
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_fetch = current_time - timedelta(minutes=5, seconds=45)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_sensor_fetch_age_minutes()
# int() truncates: 5.75 minutes → 5
assert age == 5
@pytest.mark.unit
def test_sensor_fetch_age_zero_fresh_data() -> None:
"""
Test sensor fetch age is 0 for brand new data.
Scenario: Last update was just now (< 60 seconds ago)
Expected: Fetch age is 0 minutes
"""
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_fetch = current_time - timedelta(seconds=30)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_sensor_fetch_age_minutes()
assert age == 0
@pytest.mark.unit
def test_sensor_fetch_age_multiple_hours() -> None:
"""
Test sensor fetch age for very old data (multiple hours).
Scenario: Last update was 3 hours ago (180 minutes)
Expected: Fetch age is 180 minutes
This could happen if API was down or integration was stopped.
"""
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_fetch = current_time - timedelta(hours=3)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_sensor_fetch_age_minutes()
assert age == 180
@pytest.mark.unit
def test_sensor_fetch_age_boundary_60_seconds() -> None:
"""
Test sensor fetch age exactly at 60 seconds (1 minute boundary).
Scenario: Last update was exactly 60 seconds ago
Expected: Fetch age is 1 minute
"""
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_fetch = current_time - timedelta(seconds=60)
coordinator = _create_mock_coordinator_with_pool(current_time, last_fetch)
calculator = TibberPricesLifecycleCalculator(coordinator)
age = calculator.get_sensor_fetch_age_minutes()
assert age == 1

View file

@ -257,17 +257,27 @@ def test_timing_sensors_use_minute_timer() -> None:
)
def test_lifecycle_sensor_uses_quarter_hour_timer() -> None:
def test_lifecycle_sensor_uses_quarter_hour_timer_with_state_filter() -> None:
"""
Test that data lifecycle status sensor uses Timer #2.
Test that data lifecycle status sensor uses Timer #2 WITH state-change filtering.
The lifecycle sensor needs quarter-hour updates to detect:
- Turnover pending at 23:45 (quarter-hour boundary)
- Turnover completed after midnight API update
The lifecycle sensor needs quarter-hour precision for detecting:
- 23:45: turnover_pending (last interval before midnight)
- 00:00: turnover complete (after midnight API update)
- 13:00: searching_tomorrow (when tomorrow data search begins)
To prevent recorder spam, it uses state-change filtering in both:
- _handle_coordinator_update() (Timer #1)
- _handle_time_sensitive_update() (Timer #2)
State is only written to recorder if it actually changed.
This reduces recorder entries from ~96/day to ~10-15/day.
"""
# Lifecycle sensor MUST be in TIME_SENSITIVE_ENTITY_KEYS for quarter-hour precision
assert "data_lifecycle_status" in TIME_SENSITIVE_ENTITY_KEYS, (
"Lifecycle sensor needs quarter-hour updates to detect turnover_pending\n"
"at 23:45 (last interval before midnight)"
"Lifecycle sensor needs quarter-hour updates for precise state transitions\n"
"at 23:45 (turnover_pending), 00:00 (turnover complete), 13:00 (searching_tomorrow).\n"
"State-change filter in _handle_time_sensitive_update() prevents recorder spam."
)