test(timers): comprehensive timer architecture validation

Added 60+ tests for three-timer architecture:

Timer #1 (API polling): next_api_poll_time calculation
- 8 tests covering timer offset calculation before/after 13:00
- Tests tomorrow data presence logic
- Verifies minute/second offset preservation

Timer #2 (quarter-hour refresh): :00, :15, :30, :45 boundaries
- 10 tests covering registration, cancellation, callback execution
- Verifies exact boundary timing (second=0)
- Tests independence from Timer #3

Timer #3 (minute refresh): :00, :30 every minute
- 6 tests covering 30-second boundary registration
- Verifies timing sensors assignment
- Tests countdown/progress update frequency

Sensor assignment:
- 20+ tests mapping 80+ sensors to correct timers
- Verifies TIME_SENSITIVE and MINUTE_UPDATE constants
- Catches missing/incorrect timer assignments

Impact: Comprehensive validation of timer architecture prevents
regression in entity update scheduling. Documents which sensors
use which timers.
This commit is contained in:
Julian Pawlowski 2025-11-22 04:46:30 +00:00
parent d1376c8921
commit 91ef2806e5
3 changed files with 1035 additions and 0 deletions

320
tests/test_next_api_poll.py Normal file
View file

@ -0,0 +1,320 @@
"""
Unit tests for next_api_poll_time calculation logic.
Tests the precise minute/second offset calculation for Timer #1 scheduling,
ensuring accurate prediction of when the next API poll will occur.
"""
from __future__ import annotations
from datetime import datetime
from unittest.mock import Mock
from zoneinfo import ZoneInfo
import pytest
from custom_components.tibber_prices.coordinator.constants import UPDATE_INTERVAL
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
TibberPricesLifecycleCalculator,
)
@pytest.mark.unit
def test_next_api_poll_before_13_with_timer_offset() -> None:
"""
Test next_api_poll before 13:00 with known timer offset.
Scenario: Timer runs at X:04:37 (4 minutes 37 seconds past quarter-hour)
Current time: 10:19:37 (before 13:00)
Expected: Next poll at 13:04:37 (first timer execution at or after 13:00)
"""
# Mock coordinator with timer history
coordinator = Mock()
coordinator.time = Mock()
# Current time: 10:19:37 (Timer just ran)
current_time = datetime(2025, 11, 22, 10, 19, 37, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Mock get_day_boundaries (needed to determine if tomorrow data is missing)
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
# Timer last ran at 10:19:37 (offset: 4 min 37 sec past quarter)
coordinator._last_coordinator_update = current_time # noqa: SLF001
# Mock coordinator.data (no tomorrow data yet)
coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}}
# Mock _needs_tomorrow_data (not relevant for this case)
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001
# Create calculator
calculator = TibberPricesLifecycleCalculator(coordinator)
# Calculate next poll
next_poll = calculator.get_next_api_poll_time()
# Should be 13:04:37 (first timer at or after 13:00 with same offset)
assert next_poll is not None
assert next_poll.hour == 13
assert next_poll.minute == 4
assert next_poll.second == 37
@pytest.mark.unit
def test_next_api_poll_before_13_different_offset() -> None:
"""
Test next_api_poll with different timer offset.
Scenario: Timer runs at X:11:22 (11 minutes 22 seconds past quarter-hour)
Current time: 09:26:22
Expected: Next poll at 13:11:22
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 9, 26, 22, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Mock get_day_boundaries
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
coordinator._last_coordinator_update = current_time # noqa: SLF001
coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}}
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
next_poll = calculator.get_next_api_poll_time()
assert next_poll is not None
assert next_poll.hour == 13
assert next_poll.minute == 11
assert next_poll.second == 22
@pytest.mark.unit
def test_next_api_poll_before_13_offset_requires_14xx() -> None:
"""
Test next_api_poll when timer offset doesn't fit in 13:xx hour.
Scenario: Timer runs at X:58:15 (58 minutes past hour, 13 min past 45-min mark)
Current time: 11:58:15
Expected: Next poll at 13:13:15 (13:00+13min, 13:15+13min, 13:30+13min, 13:45+13min)
Note: Even extreme offsets fit in 13:xx hour, 14:xx overflow is theoretical edge case
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 11, 58, 15, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Mock get_day_boundaries
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
# Timer offset: 58 % 15 = 13 minutes past quarter-hour
coordinator._last_coordinator_update = current_time # noqa: SLF001
coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}}
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
next_poll = calculator.get_next_api_poll_time()
# Even with 13-minute offset, first valid is 13:13:15
assert next_poll is not None
assert next_poll.hour == 13
assert next_poll.minute == 13
assert next_poll.second == 15
@pytest.mark.unit
def test_next_api_poll_before_13_no_timer_history() -> None:
"""
Test next_api_poll fallback when no timer history exists.
Scenario: Integration just started, no _last_coordinator_update yet
Current time: 10:30:00
Expected: Fallback to 13:00:00
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 10, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Mock get_day_boundaries
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
# No timer history
coordinator._last_coordinator_update = None # noqa: SLF001
coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}}
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
next_poll = calculator.get_next_api_poll_time()
# Should fallback to 13:00:00
assert next_poll is not None
assert next_poll.hour == 13
assert next_poll.minute == 0
assert next_poll.second == 0
@pytest.mark.unit
def test_next_api_poll_after_13_tomorrow_missing() -> None:
"""
Test next_api_poll after 13:00 when tomorrow data is missing.
Scenario: After 13:00, actively polling for tomorrow data
Current time: 14:30:00
Last update: 14:15:45
Expected: Last update + UPDATE_INTERVAL (15 minutes) = 14:30:45
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
last_update = datetime(2025, 11, 22, 14, 15, 45, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Mock get_day_boundaries
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
coordinator._last_coordinator_update = last_update # noqa: SLF001
coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}} # Tomorrow missing!
coordinator._needs_tomorrow_data.return_value = True # noqa: SLF001 - Tomorrow missing!
calculator = TibberPricesLifecycleCalculator(coordinator)
next_poll = calculator.get_next_api_poll_time()
# Should be last_update + 15 minutes
expected = last_update + UPDATE_INTERVAL
assert next_poll is not None
assert next_poll == expected
assert next_poll.minute == 30
assert next_poll.second == 45
@pytest.mark.unit
def test_next_api_poll_after_13_tomorrow_present() -> None:
"""
Test next_api_poll after 13:00 when tomorrow data is present.
Scenario: After 13:00, tomorrow data fetched, predicting tomorrow's first poll
Current time: 15:34:12
Timer offset: 4 minutes 12 seconds past quarter (from 15:34:12)
Expected: Tomorrow at 13:04:12
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 15, 34, 12, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Mock get_day_boundaries
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
# Timer offset: 34 % 15 = 4 minutes past quarter-hour
coordinator._last_coordinator_update = current_time # noqa: SLF001
coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": [4, 5, 6]}} # Tomorrow present!
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001 - Tomorrow present!
calculator = TibberPricesLifecycleCalculator(coordinator)
next_poll = calculator.get_next_api_poll_time()
# Should be tomorrow at 13:04:12
assert next_poll is not None
assert next_poll.day == 23 # Tomorrow
assert next_poll.hour == 13
assert next_poll.minute == 4
assert next_poll.second == 12
@pytest.mark.unit
def test_next_api_poll_exact_13_00_boundary() -> None:
"""
Test next_api_poll exactly at 13:00:00 boundary.
Scenario: Timer runs exactly at 13:00:00 (offset: 0 min 0 sec)
Current time: 13:00:00
Expected: 13:00:00 (current time matches first valid slot)
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 13, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Mock get_day_boundaries
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
# Timer runs at exact quarter-hour boundaries
coordinator._last_coordinator_update = current_time # noqa: SLF001
coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}}
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
next_poll = calculator.get_next_api_poll_time()
# Should be 13:00:00 (first valid slot)
assert next_poll is not None
assert next_poll.hour == 13
assert next_poll.minute == 0
assert next_poll.second == 0
@pytest.mark.unit
def test_next_api_poll_offset_spans_multiple_quarters() -> None:
"""
Test timer offset calculation across different quarter-hour marks.
Scenario: Timer at 12:47:33 (offset: 2 min 33 sec past 45-min mark)
Expected: 13:02:33, 13:17:33, 13:32:33, or 13:47:33 depending on >= 13:00
Result: First valid is 13:02:33
"""
coordinator = Mock()
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 12, 47, 33, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Mock get_day_boundaries
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
# Timer offset: 47 % 15 = 2 minutes past quarter
coordinator._last_coordinator_update = current_time # noqa: SLF001
coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}}
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001
calculator = TibberPricesLifecycleCalculator(coordinator)
next_poll = calculator.get_next_api_poll_time()
# Should be 13:02:33 (first quarter-hour slot >= 13:00 with offset 2:33)
assert next_poll is not None
assert next_poll.hour == 13
assert next_poll.minute == 2
assert next_poll.second == 33

View file

@ -0,0 +1,449 @@
"""
Test sensor-to-timer assignment correctness.
This tests the CRITICAL mapping between sensor entities and update timers:
- TIME_SENSITIVE sensors Timer #2 (quarter-hour: :00, :15, :30, :45)
- MINUTE_UPDATE sensors Timer #3 (minute: :00, :30)
- All other sensors No timer (only update on API data arrival)
Ensures:
1. Each sensor is assigned to the correct timer
2. Timer constants match sensor definitions
3. No sensors are missing from or incorrectly added to timer groups
"""
from custom_components.tibber_prices.binary_sensor.definitions import (
ENTITY_DESCRIPTIONS as BINARY_SENSOR_ENTITY_DESCRIPTIONS,
)
from custom_components.tibber_prices.coordinator.constants import (
MINUTE_UPDATE_ENTITY_KEYS,
TIME_SENSITIVE_ENTITY_KEYS,
)
from custom_components.tibber_prices.sensor.definitions import ENTITY_DESCRIPTIONS
def test_time_sensitive_sensors_are_valid() -> None:
"""
Test that all TIME_SENSITIVE_ENTITY_KEYS correspond to actual sensors.
Timer #2 (quarter-hour) should only trigger for sensors that exist.
"""
all_sensor_keys = {desc.key for desc in ENTITY_DESCRIPTIONS}
all_binary_sensor_keys = {desc.key for desc in BINARY_SENSOR_ENTITY_DESCRIPTIONS}
all_entity_keys = all_sensor_keys | all_binary_sensor_keys
for entity_key in TIME_SENSITIVE_ENTITY_KEYS:
assert entity_key in all_entity_keys, (
f"TIME_SENSITIVE key '{entity_key}' not found in sensor/binary_sensor definitions"
)
def test_minute_update_sensors_are_valid() -> None:
"""
Test that all MINUTE_UPDATE_ENTITY_KEYS correspond to actual sensors.
Timer #3 (minute) should only trigger for sensors that exist.
"""
all_sensor_keys = {desc.key for desc in ENTITY_DESCRIPTIONS}
all_binary_sensor_keys = {desc.key for desc in BINARY_SENSOR_ENTITY_DESCRIPTIONS}
all_entity_keys = all_sensor_keys | all_binary_sensor_keys
for entity_key in MINUTE_UPDATE_ENTITY_KEYS:
assert entity_key in all_entity_keys, (
f"MINUTE_UPDATE key '{entity_key}' not found in sensor/binary_sensor definitions"
)
def test_no_overlap_between_timer_groups() -> None:
"""
Test that TIME_SENSITIVE and MINUTE_UPDATE groups are mutually exclusive.
A sensor should never be in both timer groups simultaneously.
This would cause duplicate updates and wasted resources.
"""
overlap = TIME_SENSITIVE_ENTITY_KEYS & MINUTE_UPDATE_ENTITY_KEYS
assert not overlap, (
f"Sensors should not be in both TIME_SENSITIVE and MINUTE_UPDATE: {overlap}\n"
"Each sensor should use only ONE timer for updates."
)
def test_interval_sensors_use_quarter_hour_timer() -> None:
"""
Test that interval-based sensors (current/next/previous) use Timer #2.
These sensors need updates every 15 minutes because they reference
specific 15-minute intervals that change at quarter-hour boundaries.
"""
interval_sensors = [
"current_interval_price",
"next_interval_price",
"previous_interval_price",
"current_interval_price_level",
"next_interval_price_level",
"previous_interval_price_level",
"current_interval_price_rating",
"next_interval_price_rating",
"previous_interval_price_rating",
]
for sensor_key in interval_sensors:
assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, (
f"Interval sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)"
)
def test_rolling_hour_sensors_use_quarter_hour_timer() -> None:
"""
Test that rolling hour sensors (5-interval windows) use Timer #2.
Rolling hour calculations depend on current interval position,
which changes every 15 minutes.
"""
rolling_hour_sensors = [
"current_hour_average_price",
"next_hour_average_price",
"current_hour_price_level",
"next_hour_price_level",
"current_hour_price_rating",
"next_hour_price_rating",
]
for sensor_key in rolling_hour_sensors:
assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, (
f"Rolling hour sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)"
)
def test_future_avg_sensors_use_quarter_hour_timer() -> None:
"""
Test that future N-hour average sensors use Timer #2.
Future averages calculate rolling windows starting from "next interval",
which changes every 15 minutes.
"""
future_avg_sensors = [
"next_avg_1h",
"next_avg_2h",
"next_avg_3h",
"next_avg_4h",
"next_avg_5h",
"next_avg_6h",
"next_avg_8h",
"next_avg_12h",
]
for sensor_key in future_avg_sensors:
assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, (
f"Future avg sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)"
)
def test_trend_sensors_use_quarter_hour_timer() -> None:
"""
Test that price trend sensors use Timer #2.
Trend analysis depends on current interval position and
needs updates at quarter-hour boundaries.
"""
trend_sensors = [
"current_price_trend",
"next_price_trend_change",
"price_trend_1h",
"price_trend_2h",
"price_trend_3h",
"price_trend_4h",
"price_trend_5h",
"price_trend_6h",
"price_trend_8h",
"price_trend_12h",
]
for sensor_key in trend_sensors:
assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, (
f"Trend sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)"
)
def test_window_24h_sensors_use_quarter_hour_timer() -> None:
"""
Test that trailing/leading 24h window sensors use Timer #2.
24h windows are calculated relative to current interval,
which changes every 15 minutes.
"""
window_24h_sensors = [
"trailing_price_average",
"leading_price_average",
"trailing_price_min",
"trailing_price_max",
"leading_price_min",
"leading_price_max",
]
for sensor_key in window_24h_sensors:
assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, (
f"24h window sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)"
)
def test_period_binary_sensors_use_quarter_hour_timer() -> None:
"""
Test that best/peak price period binary sensors use Timer #2.
Binary sensors check if current time is within a period.
Periods can only change at quarter-hour interval boundaries.
"""
period_binary_sensors = [
"best_price_period",
"peak_price_period",
]
for sensor_key in period_binary_sensors:
assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, (
f"Period binary sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)"
)
def test_period_timestamp_sensors_use_quarter_hour_timer() -> None:
"""
Test that period timestamp sensors (end_time, next_start_time) use Timer #2.
Timestamp sensors report when periods end/start. Since periods can only
change at quarter-hour boundaries (intervals), they only need quarter-hour updates.
"""
timestamp_sensors = [
"best_price_end_time",
"best_price_next_start_time",
"peak_price_end_time",
"peak_price_next_start_time",
]
for sensor_key in timestamp_sensors:
assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, (
f"Timestamp sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)"
)
def test_timing_sensors_use_minute_timer() -> None:
"""
Test that countdown/progress timing sensors use Timer #3.
These sensors track time remaining and progress percentage within periods.
They need minute-by-minute updates for accurate countdown displays.
IMPORTANT: Timestamp sensors (end_time, next_start_time) do NOT use Timer #3
because periods can only change at quarter-hour boundaries.
"""
timing_sensors = [
"best_price_remaining_minutes",
"best_price_progress",
"best_price_next_in_minutes", # Corrected from best_price_next_start_minutes
"peak_price_remaining_minutes",
"peak_price_progress",
"peak_price_next_in_minutes", # Corrected from peak_price_next_start_minutes
]
for sensor_key in timing_sensors:
assert sensor_key in MINUTE_UPDATE_ENTITY_KEYS, (
f"Timing sensor '{sensor_key}' should be MINUTE_UPDATE (Timer #3)"
)
# Also verify it's NOT in TIME_SENSITIVE (no double updates)
assert sensor_key not in TIME_SENSITIVE_ENTITY_KEYS, (
f"Timing sensor '{sensor_key}' should NOT be in TIME_SENSITIVE\n"
"Minute updates are sufficient for countdown/progress tracking."
)
def test_lifecycle_sensor_uses_quarter_hour_timer() -> None:
"""
Test that data lifecycle status sensor uses Timer #2.
The lifecycle sensor needs quarter-hour updates to detect:
- Turnover pending at 23:45 (quarter-hour boundary)
- Turnover completed after midnight API update
"""
assert "data_lifecycle_status" in TIME_SENSITIVE_ENTITY_KEYS, (
"Lifecycle sensor needs quarter-hour updates to detect turnover_pending\n"
"at 23:45 (last interval before midnight)"
)
def test_daily_stat_sensors_not_in_timers() -> None:
"""
Test that daily statistic sensors (min/max/avg) do NOT use timers.
Daily stats don't depend on current time - they represent full-day aggregates.
They only need updates when new API data arrives (not time-dependent).
"""
daily_stat_sensors = [
# Today/tomorrow min prices
"daily_min_price_today",
"daily_min_price_tomorrow",
# Today/tomorrow max prices
"daily_max_price_today",
"daily_max_price_tomorrow",
# Today/tomorrow averages
"daily_average_price_today",
"daily_average_price_tomorrow",
# Daily price levels
"daily_price_level_today",
"daily_price_level_tomorrow",
# Daily price ratings
"daily_price_rating_today",
"daily_price_rating_tomorrow",
]
for sensor_key in daily_stat_sensors:
assert sensor_key not in TIME_SENSITIVE_ENTITY_KEYS, (
f"Daily stat sensor '{sensor_key}' should NOT use Timer #2\n"
"Daily statistics don't depend on current time - only on API data arrival."
)
assert sensor_key not in MINUTE_UPDATE_ENTITY_KEYS, (
f"Daily stat sensor '{sensor_key}' should NOT use Timer #3\n"
"Daily statistics don't need minute-by-minute updates."
)
def test_volatility_sensors_not_in_timers() -> None:
"""
Test that volatility sensors do NOT use timers.
Volatility analyzes price variation over fixed time windows.
Values only change when new API data arrives (not time-dependent).
"""
volatility_sensors = [
"today_volatility_level",
"tomorrow_volatility_level",
"yesterday_volatility_level",
"next_24h_volatility_level",
]
for sensor_key in volatility_sensors:
assert sensor_key not in TIME_SENSITIVE_ENTITY_KEYS, (
f"Volatility sensor '{sensor_key}' should NOT use Timer #2\n"
"Volatility calculates over fixed time windows - not time-dependent."
)
assert sensor_key not in MINUTE_UPDATE_ENTITY_KEYS, (
f"Volatility sensor '{sensor_key}' should NOT use Timer #3\n"
"Volatility doesn't need minute-by-minute updates."
)
def test_diagnostic_sensors_not_in_timers() -> None:
"""
Test that diagnostic/metadata sensors do NOT use timers.
Diagnostic sensors report static metadata or system state.
They only update when configuration changes or new API data arrives.
"""
diagnostic_sensors = [
"data_last_updated",
"home_id",
"currency_code",
"price_unit",
"grid_company",
"price_level",
"address_line1",
"address_line2",
"address_line3",
"zip_code",
"city",
"country",
"latitude",
"longitude",
"time_zone",
"estimated_annual_consumption",
"subscription_status",
"chart_data_export",
]
for sensor_key in diagnostic_sensors:
# Skip data_lifecycle_status - it needs quarter-hour updates
if sensor_key == "data_lifecycle_status":
continue
assert sensor_key not in TIME_SENSITIVE_ENTITY_KEYS, (
f"Diagnostic sensor '{sensor_key}' should NOT use Timer #2\nDiagnostic data doesn't depend on current time."
)
assert sensor_key not in MINUTE_UPDATE_ENTITY_KEYS, (
f"Diagnostic sensor '{sensor_key}' should NOT use Timer #3\n"
"Diagnostic data doesn't need minute-by-minute updates."
)
def test_timer_constants_are_comprehensive() -> None:
"""
Test that timer constants account for all time-dependent sensors.
Verifies no time-dependent sensors are missing from timer groups.
This is a safety check to catch sensors that need timers but don't have them.
"""
all_sensor_keys = {desc.key for desc in ENTITY_DESCRIPTIONS}
all_binary_sensor_keys = {desc.key for desc in BINARY_SENSOR_ENTITY_DESCRIPTIONS}
all_entity_keys = all_sensor_keys | all_binary_sensor_keys
sensors_with_timers = TIME_SENSITIVE_ENTITY_KEYS | MINUTE_UPDATE_ENTITY_KEYS
# Expected time-dependent sensor patterns
time_dependent_patterns = [
"current_",
"next_",
"previous_",
"trailing_",
"leading_",
"_remaining_",
"_progress",
"_next_in_", # Corrected from _next_start_
"_end_time",
"_period", # Binary sensors checking if NOW is in period
"price_trend_",
"next_avg_",
]
# Known exceptions that look time-dependent but aren't
known_exceptions = {
"data_last_updated", # Timestamp of last update, not time-dependent
"next_24h_volatility", # Uses fixed 24h window from current time, updated on API data
"current_interval_price_major", # Duplicate of current_interval_price (just different unit)
"best_price_period_duration", # Duration in minutes, doesn't change minute-by-minute
"peak_price_period_duration", # Duration in minutes, doesn't change minute-by-minute
}
potentially_missing = [
sensor_key
for sensor_key in all_entity_keys
if (
any(pattern in sensor_key for pattern in time_dependent_patterns)
and sensor_key not in sensors_with_timers
and sensor_key not in known_exceptions
)
]
assert not potentially_missing, (
f"These sensors appear time-dependent but aren't in any timer group:\n"
f"{potentially_missing}\n\n"
"If they truly need time-based updates, add them to TIME_SENSITIVE_ENTITY_KEYS\n"
"or MINUTE_UPDATE_ENTITY_KEYS in coordinator/constants.py"
)
def test_timer_group_sizes() -> None:
"""
Test timer group sizes as documentation/regression detection.
This isn't a strict requirement, but significant changes in group sizes
might indicate accidental additions/removals.
"""
# As of Nov 2025
expected_time_sensitive_min = 40 # At least 40 sensors
expected_minute_update = 6 # Exactly 6 timing sensors
assert len(TIME_SENSITIVE_ENTITY_KEYS) >= expected_time_sensitive_min, (
f"Expected at least {expected_time_sensitive_min} TIME_SENSITIVE sensors, got {len(TIME_SENSITIVE_ENTITY_KEYS)}"
)
assert len(MINUTE_UPDATE_ENTITY_KEYS) == expected_minute_update, (
f"Expected exactly {expected_minute_update} MINUTE_UPDATE sensors, got {len(MINUTE_UPDATE_ENTITY_KEYS)}"
)

View file

@ -0,0 +1,266 @@
"""
Test timer scheduling for entity updates at correct intervals.
This tests the three-timer architecture:
- Timer #1: API polling (15 min, random offset) - tested in test_next_api_poll.py
- Timer #2: Quarter-hour entity refresh (:00, :15, :30, :45)
- Timer #3: Timing sensors refresh (:00, :30 every minute)
See docs/development/timer-architecture.md for architecture overview.
"""
from datetime import UTC, datetime
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from custom_components.tibber_prices.coordinator.constants import (
QUARTER_HOUR_BOUNDARIES,
)
from custom_components.tibber_prices.coordinator.listeners import (
TibberPricesListenerManager,
)
from homeassistant.core import HomeAssistant
@pytest.fixture
def hass_mock() -> HomeAssistant:
"""Create a mock HomeAssistant instance."""
return MagicMock(spec=HomeAssistant)
@pytest.fixture
def listener_manager(hass_mock: HomeAssistant) -> TibberPricesListenerManager:
"""Create a ListenerManager instance for testing."""
return TibberPricesListenerManager(hass_mock, log_prefix="test_home")
def test_schedule_quarter_hour_refresh_registers_timer(
listener_manager: TibberPricesListenerManager,
) -> None:
"""
Test that quarter-hour refresh registers timer with correct boundaries.
Timer #2 should trigger at :00, :15, :30, :45 exactly.
"""
handler = MagicMock()
with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track:
mock_track.return_value = MagicMock() # Simulated cancel callback
listener_manager.schedule_quarter_hour_refresh(handler)
# Verify async_track_utc_time_change was called with correct parameters
mock_track.assert_called_once()
args, kwargs = mock_track.call_args
# Check positional arguments
assert args[0] == listener_manager.hass # hass instance
assert args[1] == handler # callback function
# Check keyword arguments
assert "minute" in kwargs
assert "second" in kwargs
assert kwargs["minute"] == (0, 15, 30, 45) # QUARTER_HOUR_BOUNDARIES
assert kwargs["second"] == 0 # Exact boundary
def test_schedule_quarter_hour_refresh_cancels_existing_timer(
listener_manager: TibberPricesListenerManager,
) -> None:
"""Test that scheduling quarter-hour refresh cancels any existing timer."""
handler = MagicMock()
cancel_mock = MagicMock()
with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track:
mock_track.return_value = cancel_mock
# Schedule first timer
listener_manager.schedule_quarter_hour_refresh(handler)
first_cancel = listener_manager._quarter_hour_timer_cancel # noqa: SLF001 # type: ignore[attr-defined]
assert first_cancel is not None
# Schedule second timer (should cancel first)
listener_manager.schedule_quarter_hour_refresh(handler)
# Verify cancel was called
cancel_mock.assert_called_once()
def test_schedule_minute_refresh_registers_timer(
listener_manager: TibberPricesListenerManager,
) -> None:
"""
Test that minute refresh registers timer with correct 30-second boundaries.
Timer #3 should trigger at :XX:00 and :XX:30 every minute.
"""
handler = MagicMock()
with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track:
mock_track.return_value = MagicMock() # Simulated cancel callback
listener_manager.schedule_minute_refresh(handler)
# Verify async_track_utc_time_change was called with correct parameters
mock_track.assert_called_once()
args, kwargs = mock_track.call_args
# Check positional arguments
assert args[0] == listener_manager.hass # hass instance
assert args[1] == handler # callback function
# Check keyword arguments
assert "second" in kwargs
assert kwargs["second"] == [0, 30] # Every 30 seconds
def test_schedule_minute_refresh_cancels_existing_timer(
listener_manager: TibberPricesListenerManager,
) -> None:
"""Test that scheduling minute refresh cancels any existing timer."""
handler = MagicMock()
cancel_mock = MagicMock()
with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track:
mock_track.return_value = cancel_mock
# Schedule first timer
listener_manager.schedule_minute_refresh(handler)
first_cancel = listener_manager._minute_timer_cancel # noqa: SLF001 # type: ignore[attr-defined]
assert first_cancel is not None
# Schedule second timer (should cancel first)
listener_manager.schedule_minute_refresh(handler)
# Verify cancel was called
cancel_mock.assert_called_once()
def test_quarter_hour_timer_boundaries_match_constants(
listener_manager: TibberPricesListenerManager,
) -> None:
"""
Test that timer boundaries match QUARTER_HOUR_BOUNDARIES constant.
This ensures Timer #2 triggers match the expected quarter-hour marks.
"""
handler = MagicMock()
with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track:
mock_track.return_value = MagicMock()
listener_manager.schedule_quarter_hour_refresh(handler)
_, kwargs = mock_track.call_args
assert kwargs["minute"] == QUARTER_HOUR_BOUNDARIES
@pytest.mark.asyncio
async def test_quarter_hour_callback_execution(
listener_manager: TibberPricesListenerManager,
) -> None:
"""
Test that quarter-hour timer callback is executed when scheduled time arrives.
This simulates Home Assistant triggering the callback at quarter-hour boundary.
"""
callback_executed = False
callback_time = None
def test_callback(now: datetime) -> None:
nonlocal callback_executed, callback_time
callback_executed = True
callback_time = now
# We need to actually trigger the callback to test execution
with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track:
# Capture the callback that would be registered
registered_callback = None
def capture_callback(_hass: Any, callback: Any, **_kwargs: Any) -> Any:
nonlocal registered_callback
registered_callback = callback
return MagicMock() # Cancel function
mock_track.side_effect = capture_callback
listener_manager.schedule_quarter_hour_refresh(test_callback)
# Simulate Home Assistant triggering the callback
assert registered_callback is not None
test_time = datetime(2025, 11, 22, 14, 15, 0, tzinfo=UTC)
registered_callback(test_time)
# Verify callback was executed
assert callback_executed
assert callback_time == test_time
@pytest.mark.asyncio
async def test_minute_callback_execution(
listener_manager: TibberPricesListenerManager,
) -> None:
"""
Test that minute timer callback is executed when scheduled time arrives.
This simulates Home Assistant triggering the callback at 30-second boundary.
"""
callback_executed = False
callback_time = None
def test_callback(now: datetime) -> None:
nonlocal callback_executed, callback_time
callback_executed = True
callback_time = now
with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track:
# Capture the callback that would be registered
registered_callback = None
def capture_callback(_hass: Any, callback: Any, **_kwargs: Any) -> Any:
nonlocal registered_callback
registered_callback = callback
return MagicMock() # Cancel function
mock_track.side_effect = capture_callback
listener_manager.schedule_minute_refresh(test_callback)
# Simulate Home Assistant triggering the callback at :30 seconds
assert registered_callback is not None
test_time = datetime(2025, 11, 22, 14, 23, 30, tzinfo=UTC)
registered_callback(test_time)
# Verify callback was executed
assert callback_executed
assert callback_time == test_time
def test_multiple_timer_independence(
listener_manager: TibberPricesListenerManager,
) -> None:
"""
Test that quarter-hour and minute timers operate independently.
Both timers should be able to coexist without interfering.
"""
quarter_handler = MagicMock()
minute_handler = MagicMock()
with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track:
mock_track.return_value = MagicMock()
# Schedule both timers
listener_manager.schedule_quarter_hour_refresh(quarter_handler)
listener_manager.schedule_minute_refresh(minute_handler)
# Verify both were registered (implementation detail check)
assert hasattr(listener_manager, "_quarter_hour_timer_cancel")
assert hasattr(listener_manager, "_minute_timer_cancel")
assert listener_manager._quarter_hour_timer_cancel is not None # noqa: SLF001 # type: ignore[attr-defined]
assert listener_manager._minute_timer_cancel is not None # noqa: SLF001 # type: ignore[attr-defined]
# Verify async_track_utc_time_change was called twice
assert mock_track.call_count == 2