diff --git a/tests/test_next_api_poll.py b/tests/test_next_api_poll.py new file mode 100644 index 0000000..211f7ca --- /dev/null +++ b/tests/test_next_api_poll.py @@ -0,0 +1,320 @@ +""" +Unit tests for next_api_poll_time calculation logic. + +Tests the precise minute/second offset calculation for Timer #1 scheduling, +ensuring accurate prediction of when the next API poll will occur. +""" + +from __future__ import annotations + +from datetime import datetime +from unittest.mock import Mock +from zoneinfo import ZoneInfo + +import pytest + +from custom_components.tibber_prices.coordinator.constants import UPDATE_INTERVAL +from custom_components.tibber_prices.sensor.calculators.lifecycle import ( + TibberPricesLifecycleCalculator, +) + + +@pytest.mark.unit +def test_next_api_poll_before_13_with_timer_offset() -> None: + """ + Test next_api_poll before 13:00 with known timer offset. + + Scenario: Timer runs at X:04:37 (4 minutes 37 seconds past quarter-hour) + Current time: 10:19:37 (before 13:00) + Expected: Next poll at 13:04:37 (first timer execution at or after 13:00) + """ + # Mock coordinator with timer history + coordinator = Mock() + coordinator.time = Mock() + + # Current time: 10:19:37 (Timer just ran) + current_time = datetime(2025, 11, 22, 10, 19, 37, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.now.return_value = current_time + coordinator.time.as_local.side_effect = lambda dt: dt + + # Mock get_day_boundaries (needed to determine if tomorrow data is missing) + today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight) + + # Timer last ran at 10:19:37 (offset: 4 min 37 sec past quarter) + coordinator._last_coordinator_update = current_time # noqa: SLF001 + + # Mock coordinator.data (no tomorrow data yet) + coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}} + + # Mock _needs_tomorrow_data (not relevant for this case) + coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001 + + # Create calculator + calculator = TibberPricesLifecycleCalculator(coordinator) + + # Calculate next poll + next_poll = calculator.get_next_api_poll_time() + + # Should be 13:04:37 (first timer at or after 13:00 with same offset) + assert next_poll is not None + assert next_poll.hour == 13 + assert next_poll.minute == 4 + assert next_poll.second == 37 + + +@pytest.mark.unit +def test_next_api_poll_before_13_different_offset() -> None: + """ + Test next_api_poll with different timer offset. + + Scenario: Timer runs at X:11:22 (11 minutes 22 seconds past quarter-hour) + Current time: 09:26:22 + Expected: Next poll at 13:11:22 + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 9, 26, 22, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.now.return_value = current_time + coordinator.time.as_local.side_effect = lambda dt: dt + + # Mock get_day_boundaries + today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight) + + coordinator._last_coordinator_update = current_time # noqa: SLF001 + coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}} + coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + next_poll = calculator.get_next_api_poll_time() + + assert next_poll is not None + assert next_poll.hour == 13 + assert next_poll.minute == 11 + assert next_poll.second == 22 + + +@pytest.mark.unit +def test_next_api_poll_before_13_offset_requires_14xx() -> None: + """ + Test next_api_poll when timer offset doesn't fit in 13:xx hour. + + Scenario: Timer runs at X:58:15 (58 minutes past hour, 13 min past 45-min mark) + Current time: 11:58:15 + Expected: Next poll at 13:13:15 (13:00+13min, 13:15+13min, 13:30+13min, 13:45+13min) + Note: Even extreme offsets fit in 13:xx hour, 14:xx overflow is theoretical edge case + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 11, 58, 15, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.now.return_value = current_time + coordinator.time.as_local.side_effect = lambda dt: dt + + # Mock get_day_boundaries + today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight) + + # Timer offset: 58 % 15 = 13 minutes past quarter-hour + coordinator._last_coordinator_update = current_time # noqa: SLF001 + coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}} + coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + next_poll = calculator.get_next_api_poll_time() + + # Even with 13-minute offset, first valid is 13:13:15 + assert next_poll is not None + assert next_poll.hour == 13 + assert next_poll.minute == 13 + assert next_poll.second == 15 + + +@pytest.mark.unit +def test_next_api_poll_before_13_no_timer_history() -> None: + """ + Test next_api_poll fallback when no timer history exists. + + Scenario: Integration just started, no _last_coordinator_update yet + Current time: 10:30:00 + Expected: Fallback to 13:00:00 + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 10, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.now.return_value = current_time + coordinator.time.as_local.side_effect = lambda dt: dt + + # Mock get_day_boundaries + today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight) + + # No timer history + coordinator._last_coordinator_update = None # noqa: SLF001 + coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}} + coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + next_poll = calculator.get_next_api_poll_time() + + # Should fallback to 13:00:00 + assert next_poll is not None + assert next_poll.hour == 13 + assert next_poll.minute == 0 + assert next_poll.second == 0 + + +@pytest.mark.unit +def test_next_api_poll_after_13_tomorrow_missing() -> None: + """ + Test next_api_poll after 13:00 when tomorrow data is missing. + + Scenario: After 13:00, actively polling for tomorrow data + Current time: 14:30:00 + Last update: 14:15:45 + Expected: Last update + UPDATE_INTERVAL (15 minutes) = 14:30:45 + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) + last_update = datetime(2025, 11, 22, 14, 15, 45, tzinfo=ZoneInfo("Europe/Oslo")) + + coordinator.time.now.return_value = current_time + coordinator.time.as_local.side_effect = lambda dt: dt + + # Mock get_day_boundaries + today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight) + + coordinator._last_coordinator_update = last_update # noqa: SLF001 + coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}} # Tomorrow missing! + coordinator._needs_tomorrow_data.return_value = True # noqa: SLF001 - Tomorrow missing! + + calculator = TibberPricesLifecycleCalculator(coordinator) + next_poll = calculator.get_next_api_poll_time() + + # Should be last_update + 15 minutes + expected = last_update + UPDATE_INTERVAL + assert next_poll is not None + assert next_poll == expected + assert next_poll.minute == 30 + assert next_poll.second == 45 + + +@pytest.mark.unit +def test_next_api_poll_after_13_tomorrow_present() -> None: + """ + Test next_api_poll after 13:00 when tomorrow data is present. + + Scenario: After 13:00, tomorrow data fetched, predicting tomorrow's first poll + Current time: 15:34:12 + Timer offset: 4 minutes 12 seconds past quarter (from 15:34:12) + Expected: Tomorrow at 13:04:12 + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 15, 34, 12, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.now.return_value = current_time + coordinator.time.as_local.side_effect = lambda dt: dt + + # Mock get_day_boundaries + today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight) + + # Timer offset: 34 % 15 = 4 minutes past quarter-hour + coordinator._last_coordinator_update = current_time # noqa: SLF001 + coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": [4, 5, 6]}} # Tomorrow present! + coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001 - Tomorrow present! + + calculator = TibberPricesLifecycleCalculator(coordinator) + next_poll = calculator.get_next_api_poll_time() + + # Should be tomorrow at 13:04:12 + assert next_poll is not None + assert next_poll.day == 23 # Tomorrow + assert next_poll.hour == 13 + assert next_poll.minute == 4 + assert next_poll.second == 12 + + +@pytest.mark.unit +def test_next_api_poll_exact_13_00_boundary() -> None: + """ + Test next_api_poll exactly at 13:00:00 boundary. + + Scenario: Timer runs exactly at 13:00:00 (offset: 0 min 0 sec) + Current time: 13:00:00 + Expected: 13:00:00 (current time matches first valid slot) + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 13, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.now.return_value = current_time + coordinator.time.as_local.side_effect = lambda dt: dt + + # Mock get_day_boundaries + today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight) + + # Timer runs at exact quarter-hour boundaries + coordinator._last_coordinator_update = current_time # noqa: SLF001 + coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}} + coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + next_poll = calculator.get_next_api_poll_time() + + # Should be 13:00:00 (first valid slot) + assert next_poll is not None + assert next_poll.hour == 13 + assert next_poll.minute == 0 + assert next_poll.second == 0 + + +@pytest.mark.unit +def test_next_api_poll_offset_spans_multiple_quarters() -> None: + """ + Test timer offset calculation across different quarter-hour marks. + + Scenario: Timer at 12:47:33 (offset: 2 min 33 sec past 45-min mark) + Expected: 13:02:33, 13:17:33, 13:32:33, or 13:47:33 depending on >= 13:00 + Result: First valid is 13:02:33 + """ + coordinator = Mock() + coordinator.time = Mock() + + current_time = datetime(2025, 11, 22, 12, 47, 33, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.now.return_value = current_time + coordinator.time.as_local.side_effect = lambda dt: dt + + # Mock get_day_boundaries + today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo")) + coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight) + + # Timer offset: 47 % 15 = 2 minutes past quarter + coordinator._last_coordinator_update = current_time # noqa: SLF001 + coordinator.data = {"priceInfo": {"today": [1, 2, 3], "tomorrow": []}} + coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001 + + calculator = TibberPricesLifecycleCalculator(coordinator) + next_poll = calculator.get_next_api_poll_time() + + # Should be 13:02:33 (first quarter-hour slot >= 13:00 with offset 2:33) + assert next_poll is not None + assert next_poll.hour == 13 + assert next_poll.minute == 2 + assert next_poll.second == 33 diff --git a/tests/test_sensor_timer_assignment.py b/tests/test_sensor_timer_assignment.py new file mode 100644 index 0000000..526c390 --- /dev/null +++ b/tests/test_sensor_timer_assignment.py @@ -0,0 +1,449 @@ +""" +Test sensor-to-timer assignment correctness. + +This tests the CRITICAL mapping between sensor entities and update timers: +- TIME_SENSITIVE sensors → Timer #2 (quarter-hour: :00, :15, :30, :45) +- MINUTE_UPDATE sensors → Timer #3 (minute: :00, :30) +- All other sensors → No timer (only update on API data arrival) + +Ensures: +1. Each sensor is assigned to the correct timer +2. Timer constants match sensor definitions +3. No sensors are missing from or incorrectly added to timer groups +""" + +from custom_components.tibber_prices.binary_sensor.definitions import ( + ENTITY_DESCRIPTIONS as BINARY_SENSOR_ENTITY_DESCRIPTIONS, +) +from custom_components.tibber_prices.coordinator.constants import ( + MINUTE_UPDATE_ENTITY_KEYS, + TIME_SENSITIVE_ENTITY_KEYS, +) +from custom_components.tibber_prices.sensor.definitions import ENTITY_DESCRIPTIONS + + +def test_time_sensitive_sensors_are_valid() -> None: + """ + Test that all TIME_SENSITIVE_ENTITY_KEYS correspond to actual sensors. + + Timer #2 (quarter-hour) should only trigger for sensors that exist. + """ + all_sensor_keys = {desc.key for desc in ENTITY_DESCRIPTIONS} + all_binary_sensor_keys = {desc.key for desc in BINARY_SENSOR_ENTITY_DESCRIPTIONS} + all_entity_keys = all_sensor_keys | all_binary_sensor_keys + + for entity_key in TIME_SENSITIVE_ENTITY_KEYS: + assert entity_key in all_entity_keys, ( + f"TIME_SENSITIVE key '{entity_key}' not found in sensor/binary_sensor definitions" + ) + + +def test_minute_update_sensors_are_valid() -> None: + """ + Test that all MINUTE_UPDATE_ENTITY_KEYS correspond to actual sensors. + + Timer #3 (minute) should only trigger for sensors that exist. + """ + all_sensor_keys = {desc.key for desc in ENTITY_DESCRIPTIONS} + all_binary_sensor_keys = {desc.key for desc in BINARY_SENSOR_ENTITY_DESCRIPTIONS} + all_entity_keys = all_sensor_keys | all_binary_sensor_keys + + for entity_key in MINUTE_UPDATE_ENTITY_KEYS: + assert entity_key in all_entity_keys, ( + f"MINUTE_UPDATE key '{entity_key}' not found in sensor/binary_sensor definitions" + ) + + +def test_no_overlap_between_timer_groups() -> None: + """ + Test that TIME_SENSITIVE and MINUTE_UPDATE groups are mutually exclusive. + + A sensor should never be in both timer groups simultaneously. + This would cause duplicate updates and wasted resources. + """ + overlap = TIME_SENSITIVE_ENTITY_KEYS & MINUTE_UPDATE_ENTITY_KEYS + + assert not overlap, ( + f"Sensors should not be in both TIME_SENSITIVE and MINUTE_UPDATE: {overlap}\n" + "Each sensor should use only ONE timer for updates." + ) + + +def test_interval_sensors_use_quarter_hour_timer() -> None: + """ + Test that interval-based sensors (current/next/previous) use Timer #2. + + These sensors need updates every 15 minutes because they reference + specific 15-minute intervals that change at quarter-hour boundaries. + """ + interval_sensors = [ + "current_interval_price", + "next_interval_price", + "previous_interval_price", + "current_interval_price_level", + "next_interval_price_level", + "previous_interval_price_level", + "current_interval_price_rating", + "next_interval_price_rating", + "previous_interval_price_rating", + ] + + for sensor_key in interval_sensors: + assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Interval sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)" + ) + + +def test_rolling_hour_sensors_use_quarter_hour_timer() -> None: + """ + Test that rolling hour sensors (5-interval windows) use Timer #2. + + Rolling hour calculations depend on current interval position, + which changes every 15 minutes. + """ + rolling_hour_sensors = [ + "current_hour_average_price", + "next_hour_average_price", + "current_hour_price_level", + "next_hour_price_level", + "current_hour_price_rating", + "next_hour_price_rating", + ] + + for sensor_key in rolling_hour_sensors: + assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Rolling hour sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)" + ) + + +def test_future_avg_sensors_use_quarter_hour_timer() -> None: + """ + Test that future N-hour average sensors use Timer #2. + + Future averages calculate rolling windows starting from "next interval", + which changes every 15 minutes. + """ + future_avg_sensors = [ + "next_avg_1h", + "next_avg_2h", + "next_avg_3h", + "next_avg_4h", + "next_avg_5h", + "next_avg_6h", + "next_avg_8h", + "next_avg_12h", + ] + + for sensor_key in future_avg_sensors: + assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Future avg sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)" + ) + + +def test_trend_sensors_use_quarter_hour_timer() -> None: + """ + Test that price trend sensors use Timer #2. + + Trend analysis depends on current interval position and + needs updates at quarter-hour boundaries. + """ + trend_sensors = [ + "current_price_trend", + "next_price_trend_change", + "price_trend_1h", + "price_trend_2h", + "price_trend_3h", + "price_trend_4h", + "price_trend_5h", + "price_trend_6h", + "price_trend_8h", + "price_trend_12h", + ] + + for sensor_key in trend_sensors: + assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Trend sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)" + ) + + +def test_window_24h_sensors_use_quarter_hour_timer() -> None: + """ + Test that trailing/leading 24h window sensors use Timer #2. + + 24h windows are calculated relative to current interval, + which changes every 15 minutes. + """ + window_24h_sensors = [ + "trailing_price_average", + "leading_price_average", + "trailing_price_min", + "trailing_price_max", + "leading_price_min", + "leading_price_max", + ] + + for sensor_key in window_24h_sensors: + assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, ( + f"24h window sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)" + ) + + +def test_period_binary_sensors_use_quarter_hour_timer() -> None: + """ + Test that best/peak price period binary sensors use Timer #2. + + Binary sensors check if current time is within a period. + Periods can only change at quarter-hour interval boundaries. + """ + period_binary_sensors = [ + "best_price_period", + "peak_price_period", + ] + + for sensor_key in period_binary_sensors: + assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Period binary sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)" + ) + + +def test_period_timestamp_sensors_use_quarter_hour_timer() -> None: + """ + Test that period timestamp sensors (end_time, next_start_time) use Timer #2. + + Timestamp sensors report when periods end/start. Since periods can only + change at quarter-hour boundaries (intervals), they only need quarter-hour updates. + """ + timestamp_sensors = [ + "best_price_end_time", + "best_price_next_start_time", + "peak_price_end_time", + "peak_price_next_start_time", + ] + + for sensor_key in timestamp_sensors: + assert sensor_key in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Timestamp sensor '{sensor_key}' should be TIME_SENSITIVE (Timer #2)" + ) + + +def test_timing_sensors_use_minute_timer() -> None: + """ + Test that countdown/progress timing sensors use Timer #3. + + These sensors track time remaining and progress percentage within periods. + They need minute-by-minute updates for accurate countdown displays. + + IMPORTANT: Timestamp sensors (end_time, next_start_time) do NOT use Timer #3 + because periods can only change at quarter-hour boundaries. + """ + timing_sensors = [ + "best_price_remaining_minutes", + "best_price_progress", + "best_price_next_in_minutes", # Corrected from best_price_next_start_minutes + "peak_price_remaining_minutes", + "peak_price_progress", + "peak_price_next_in_minutes", # Corrected from peak_price_next_start_minutes + ] + + for sensor_key in timing_sensors: + assert sensor_key in MINUTE_UPDATE_ENTITY_KEYS, ( + f"Timing sensor '{sensor_key}' should be MINUTE_UPDATE (Timer #3)" + ) + + # Also verify it's NOT in TIME_SENSITIVE (no double updates) + assert sensor_key not in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Timing sensor '{sensor_key}' should NOT be in TIME_SENSITIVE\n" + "Minute updates are sufficient for countdown/progress tracking." + ) + + +def test_lifecycle_sensor_uses_quarter_hour_timer() -> None: + """ + Test that data lifecycle status sensor uses Timer #2. + + The lifecycle sensor needs quarter-hour updates to detect: + - Turnover pending at 23:45 (quarter-hour boundary) + - Turnover completed after midnight API update + """ + assert "data_lifecycle_status" in TIME_SENSITIVE_ENTITY_KEYS, ( + "Lifecycle sensor needs quarter-hour updates to detect turnover_pending\n" + "at 23:45 (last interval before midnight)" + ) + + +def test_daily_stat_sensors_not_in_timers() -> None: + """ + Test that daily statistic sensors (min/max/avg) do NOT use timers. + + Daily stats don't depend on current time - they represent full-day aggregates. + They only need updates when new API data arrives (not time-dependent). + """ + daily_stat_sensors = [ + # Today/tomorrow min prices + "daily_min_price_today", + "daily_min_price_tomorrow", + # Today/tomorrow max prices + "daily_max_price_today", + "daily_max_price_tomorrow", + # Today/tomorrow averages + "daily_average_price_today", + "daily_average_price_tomorrow", + # Daily price levels + "daily_price_level_today", + "daily_price_level_tomorrow", + # Daily price ratings + "daily_price_rating_today", + "daily_price_rating_tomorrow", + ] + + for sensor_key in daily_stat_sensors: + assert sensor_key not in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Daily stat sensor '{sensor_key}' should NOT use Timer #2\n" + "Daily statistics don't depend on current time - only on API data arrival." + ) + assert sensor_key not in MINUTE_UPDATE_ENTITY_KEYS, ( + f"Daily stat sensor '{sensor_key}' should NOT use Timer #3\n" + "Daily statistics don't need minute-by-minute updates." + ) + + +def test_volatility_sensors_not_in_timers() -> None: + """ + Test that volatility sensors do NOT use timers. + + Volatility analyzes price variation over fixed time windows. + Values only change when new API data arrives (not time-dependent). + """ + volatility_sensors = [ + "today_volatility_level", + "tomorrow_volatility_level", + "yesterday_volatility_level", + "next_24h_volatility_level", + ] + + for sensor_key in volatility_sensors: + assert sensor_key not in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Volatility sensor '{sensor_key}' should NOT use Timer #2\n" + "Volatility calculates over fixed time windows - not time-dependent." + ) + assert sensor_key not in MINUTE_UPDATE_ENTITY_KEYS, ( + f"Volatility sensor '{sensor_key}' should NOT use Timer #3\n" + "Volatility doesn't need minute-by-minute updates." + ) + + +def test_diagnostic_sensors_not_in_timers() -> None: + """ + Test that diagnostic/metadata sensors do NOT use timers. + + Diagnostic sensors report static metadata or system state. + They only update when configuration changes or new API data arrives. + """ + diagnostic_sensors = [ + "data_last_updated", + "home_id", + "currency_code", + "price_unit", + "grid_company", + "price_level", + "address_line1", + "address_line2", + "address_line3", + "zip_code", + "city", + "country", + "latitude", + "longitude", + "time_zone", + "estimated_annual_consumption", + "subscription_status", + "chart_data_export", + ] + + for sensor_key in diagnostic_sensors: + # Skip data_lifecycle_status - it needs quarter-hour updates + if sensor_key == "data_lifecycle_status": + continue + + assert sensor_key not in TIME_SENSITIVE_ENTITY_KEYS, ( + f"Diagnostic sensor '{sensor_key}' should NOT use Timer #2\nDiagnostic data doesn't depend on current time." + ) + assert sensor_key not in MINUTE_UPDATE_ENTITY_KEYS, ( + f"Diagnostic sensor '{sensor_key}' should NOT use Timer #3\n" + "Diagnostic data doesn't need minute-by-minute updates." + ) + + +def test_timer_constants_are_comprehensive() -> None: + """ + Test that timer constants account for all time-dependent sensors. + + Verifies no time-dependent sensors are missing from timer groups. + This is a safety check to catch sensors that need timers but don't have them. + """ + all_sensor_keys = {desc.key for desc in ENTITY_DESCRIPTIONS} + all_binary_sensor_keys = {desc.key for desc in BINARY_SENSOR_ENTITY_DESCRIPTIONS} + all_entity_keys = all_sensor_keys | all_binary_sensor_keys + sensors_with_timers = TIME_SENSITIVE_ENTITY_KEYS | MINUTE_UPDATE_ENTITY_KEYS + + # Expected time-dependent sensor patterns + time_dependent_patterns = [ + "current_", + "next_", + "previous_", + "trailing_", + "leading_", + "_remaining_", + "_progress", + "_next_in_", # Corrected from _next_start_ + "_end_time", + "_period", # Binary sensors checking if NOW is in period + "price_trend_", + "next_avg_", + ] + + # Known exceptions that look time-dependent but aren't + known_exceptions = { + "data_last_updated", # Timestamp of last update, not time-dependent + "next_24h_volatility", # Uses fixed 24h window from current time, updated on API data + "current_interval_price_major", # Duplicate of current_interval_price (just different unit) + "best_price_period_duration", # Duration in minutes, doesn't change minute-by-minute + "peak_price_period_duration", # Duration in minutes, doesn't change minute-by-minute + } + + potentially_missing = [ + sensor_key + for sensor_key in all_entity_keys + if ( + any(pattern in sensor_key for pattern in time_dependent_patterns) + and sensor_key not in sensors_with_timers + and sensor_key not in known_exceptions + ) + ] + + assert not potentially_missing, ( + f"These sensors appear time-dependent but aren't in any timer group:\n" + f"{potentially_missing}\n\n" + "If they truly need time-based updates, add them to TIME_SENSITIVE_ENTITY_KEYS\n" + "or MINUTE_UPDATE_ENTITY_KEYS in coordinator/constants.py" + ) + + +def test_timer_group_sizes() -> None: + """ + Test timer group sizes as documentation/regression detection. + + This isn't a strict requirement, but significant changes in group sizes + might indicate accidental additions/removals. + """ + # As of Nov 2025 + expected_time_sensitive_min = 40 # At least 40 sensors + expected_minute_update = 6 # Exactly 6 timing sensors + + assert len(TIME_SENSITIVE_ENTITY_KEYS) >= expected_time_sensitive_min, ( + f"Expected at least {expected_time_sensitive_min} TIME_SENSITIVE sensors, got {len(TIME_SENSITIVE_ENTITY_KEYS)}" + ) + + assert len(MINUTE_UPDATE_ENTITY_KEYS) == expected_minute_update, ( + f"Expected exactly {expected_minute_update} MINUTE_UPDATE sensors, got {len(MINUTE_UPDATE_ENTITY_KEYS)}" + ) diff --git a/tests/test_timer_scheduling.py b/tests/test_timer_scheduling.py new file mode 100644 index 0000000..c14d39d --- /dev/null +++ b/tests/test_timer_scheduling.py @@ -0,0 +1,266 @@ +""" +Test timer scheduling for entity updates at correct intervals. + +This tests the three-timer architecture: +- Timer #1: API polling (15 min, random offset) - tested in test_next_api_poll.py +- Timer #2: Quarter-hour entity refresh (:00, :15, :30, :45) +- Timer #3: Timing sensors refresh (:00, :30 every minute) + +See docs/development/timer-architecture.md for architecture overview. +""" + +from datetime import UTC, datetime +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +from custom_components.tibber_prices.coordinator.constants import ( + QUARTER_HOUR_BOUNDARIES, +) +from custom_components.tibber_prices.coordinator.listeners import ( + TibberPricesListenerManager, +) +from homeassistant.core import HomeAssistant + + +@pytest.fixture +def hass_mock() -> HomeAssistant: + """Create a mock HomeAssistant instance.""" + return MagicMock(spec=HomeAssistant) + + +@pytest.fixture +def listener_manager(hass_mock: HomeAssistant) -> TibberPricesListenerManager: + """Create a ListenerManager instance for testing.""" + return TibberPricesListenerManager(hass_mock, log_prefix="test_home") + + +def test_schedule_quarter_hour_refresh_registers_timer( + listener_manager: TibberPricesListenerManager, +) -> None: + """ + Test that quarter-hour refresh registers timer with correct boundaries. + + Timer #2 should trigger at :00, :15, :30, :45 exactly. + """ + handler = MagicMock() + + with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track: + mock_track.return_value = MagicMock() # Simulated cancel callback + + listener_manager.schedule_quarter_hour_refresh(handler) + + # Verify async_track_utc_time_change was called with correct parameters + mock_track.assert_called_once() + args, kwargs = mock_track.call_args + + # Check positional arguments + assert args[0] == listener_manager.hass # hass instance + assert args[1] == handler # callback function + + # Check keyword arguments + assert "minute" in kwargs + assert "second" in kwargs + assert kwargs["minute"] == (0, 15, 30, 45) # QUARTER_HOUR_BOUNDARIES + assert kwargs["second"] == 0 # Exact boundary + + +def test_schedule_quarter_hour_refresh_cancels_existing_timer( + listener_manager: TibberPricesListenerManager, +) -> None: + """Test that scheduling quarter-hour refresh cancels any existing timer.""" + handler = MagicMock() + cancel_mock = MagicMock() + + with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track: + mock_track.return_value = cancel_mock + + # Schedule first timer + listener_manager.schedule_quarter_hour_refresh(handler) + first_cancel = listener_manager._quarter_hour_timer_cancel # noqa: SLF001 # type: ignore[attr-defined] + assert first_cancel is not None + + # Schedule second timer (should cancel first) + listener_manager.schedule_quarter_hour_refresh(handler) + + # Verify cancel was called + cancel_mock.assert_called_once() + + +def test_schedule_minute_refresh_registers_timer( + listener_manager: TibberPricesListenerManager, +) -> None: + """ + Test that minute refresh registers timer with correct 30-second boundaries. + + Timer #3 should trigger at :XX:00 and :XX:30 every minute. + """ + handler = MagicMock() + + with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track: + mock_track.return_value = MagicMock() # Simulated cancel callback + + listener_manager.schedule_minute_refresh(handler) + + # Verify async_track_utc_time_change was called with correct parameters + mock_track.assert_called_once() + args, kwargs = mock_track.call_args + + # Check positional arguments + assert args[0] == listener_manager.hass # hass instance + assert args[1] == handler # callback function + + # Check keyword arguments + assert "second" in kwargs + assert kwargs["second"] == [0, 30] # Every 30 seconds + + +def test_schedule_minute_refresh_cancels_existing_timer( + listener_manager: TibberPricesListenerManager, +) -> None: + """Test that scheduling minute refresh cancels any existing timer.""" + handler = MagicMock() + cancel_mock = MagicMock() + + with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track: + mock_track.return_value = cancel_mock + + # Schedule first timer + listener_manager.schedule_minute_refresh(handler) + first_cancel = listener_manager._minute_timer_cancel # noqa: SLF001 # type: ignore[attr-defined] + assert first_cancel is not None + + # Schedule second timer (should cancel first) + listener_manager.schedule_minute_refresh(handler) + + # Verify cancel was called + cancel_mock.assert_called_once() + + +def test_quarter_hour_timer_boundaries_match_constants( + listener_manager: TibberPricesListenerManager, +) -> None: + """ + Test that timer boundaries match QUARTER_HOUR_BOUNDARIES constant. + + This ensures Timer #2 triggers match the expected quarter-hour marks. + """ + handler = MagicMock() + + with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track: + mock_track.return_value = MagicMock() + + listener_manager.schedule_quarter_hour_refresh(handler) + + _, kwargs = mock_track.call_args + assert kwargs["minute"] == QUARTER_HOUR_BOUNDARIES + + +@pytest.mark.asyncio +async def test_quarter_hour_callback_execution( + listener_manager: TibberPricesListenerManager, +) -> None: + """ + Test that quarter-hour timer callback is executed when scheduled time arrives. + + This simulates Home Assistant triggering the callback at quarter-hour boundary. + """ + callback_executed = False + callback_time = None + + def test_callback(now: datetime) -> None: + nonlocal callback_executed, callback_time + callback_executed = True + callback_time = now + + # We need to actually trigger the callback to test execution + with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track: + # Capture the callback that would be registered + registered_callback = None + + def capture_callback(_hass: Any, callback: Any, **_kwargs: Any) -> Any: + nonlocal registered_callback + registered_callback = callback + return MagicMock() # Cancel function + + mock_track.side_effect = capture_callback + + listener_manager.schedule_quarter_hour_refresh(test_callback) + + # Simulate Home Assistant triggering the callback + assert registered_callback is not None + test_time = datetime(2025, 11, 22, 14, 15, 0, tzinfo=UTC) + registered_callback(test_time) + + # Verify callback was executed + assert callback_executed + assert callback_time == test_time + + +@pytest.mark.asyncio +async def test_minute_callback_execution( + listener_manager: TibberPricesListenerManager, +) -> None: + """ + Test that minute timer callback is executed when scheduled time arrives. + + This simulates Home Assistant triggering the callback at 30-second boundary. + """ + callback_executed = False + callback_time = None + + def test_callback(now: datetime) -> None: + nonlocal callback_executed, callback_time + callback_executed = True + callback_time = now + + with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track: + # Capture the callback that would be registered + registered_callback = None + + def capture_callback(_hass: Any, callback: Any, **_kwargs: Any) -> Any: + nonlocal registered_callback + registered_callback = callback + return MagicMock() # Cancel function + + mock_track.side_effect = capture_callback + + listener_manager.schedule_minute_refresh(test_callback) + + # Simulate Home Assistant triggering the callback at :30 seconds + assert registered_callback is not None + test_time = datetime(2025, 11, 22, 14, 23, 30, tzinfo=UTC) + registered_callback(test_time) + + # Verify callback was executed + assert callback_executed + assert callback_time == test_time + + +def test_multiple_timer_independence( + listener_manager: TibberPricesListenerManager, +) -> None: + """ + Test that quarter-hour and minute timers operate independently. + + Both timers should be able to coexist without interfering. + """ + quarter_handler = MagicMock() + minute_handler = MagicMock() + + with patch("custom_components.tibber_prices.coordinator.listeners.async_track_utc_time_change") as mock_track: + mock_track.return_value = MagicMock() + + # Schedule both timers + listener_manager.schedule_quarter_hour_refresh(quarter_handler) + listener_manager.schedule_minute_refresh(minute_handler) + + # Verify both were registered (implementation detail check) + assert hasattr(listener_manager, "_quarter_hour_timer_cancel") + assert hasattr(listener_manager, "_minute_timer_cancel") + assert listener_manager._quarter_hour_timer_cancel is not None # noqa: SLF001 # type: ignore[attr-defined] + assert listener_manager._minute_timer_cancel is not None # noqa: SLF001 # type: ignore[attr-defined] + + # Verify async_track_utc_time_change was called twice + assert mock_track.call_count == 2