mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-29 21:03:40 +00:00
fix(sensors): ensure connection/tomorrow_data/lifecycle consistency
Fixed state inconsistencies during auth errors: Bug #4: tomorrow_data_available incorrectly returns True during auth failure - Now returns None (unknown) when coordinator.last_exception is ConfigEntryAuthFailed - Prevents misleading "data available" when API connection lost Bug #5: Sensor states inconsistent during error conditions - connection: False during auth error (even with cached data) - tomorrow_data_available: None during auth error (cannot verify) - lifecycle_status: "error" during auth error Changes: - binary_sensor/core.py: Check last_exception before evaluating tomorrow data - tests: 25 integration tests covering all error scenarios Impact: All three sensors show consistent states during auth errors, API timeouts, and normal operation. No misleading "available" status when connection is lost.
This commit is contained in:
parent
85fe9666a7
commit
c7f6843c5b
3 changed files with 841 additions and 1 deletions
|
|
@ -5,6 +5,7 @@ from __future__ import annotations
|
|||
from typing import TYPE_CHECKING
|
||||
|
||||
from custom_components.tibber_prices.coordinator import TIME_SENSITIVE_ENTITY_KEYS
|
||||
from custom_components.tibber_prices.coordinator.core import get_connection_state
|
||||
from custom_components.tibber_prices.entity import TibberPricesEntity
|
||||
from custom_components.tibber_prices.entity_utils import get_binary_sensor_icon
|
||||
from homeassistant.components.binary_sensor import (
|
||||
|
|
@ -12,6 +13,7 @@ from homeassistant.components.binary_sensor import (
|
|||
BinarySensorEntityDescription,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
|
||||
from .attributes import (
|
||||
build_async_extra_state_attributes,
|
||||
|
|
@ -44,6 +46,11 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
|
|||
self._attr_unique_id = f"{coordinator.config_entry.entry_id}_{entity_description.key}"
|
||||
self._state_getter: Callable | None = self._get_value_getter()
|
||||
self._time_sensitive_remove_listener: Callable | None = None
|
||||
self._lifecycle_remove_listener: Callable | None = None
|
||||
|
||||
# Register for lifecycle push updates if this sensor depends on connection state
|
||||
if entity_description.key in ("connection", "tomorrow_data_available"):
|
||||
self._lifecycle_remove_listener = coordinator.register_lifecycle_callback(self.async_write_ha_state)
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""When entity is added to hass."""
|
||||
|
|
@ -64,6 +71,11 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
|
|||
self._time_sensitive_remove_listener()
|
||||
self._time_sensitive_remove_listener = None
|
||||
|
||||
# Remove lifecycle listener if registered
|
||||
if self._lifecycle_remove_listener:
|
||||
self._lifecycle_remove_listener()
|
||||
self._lifecycle_remove_listener = None
|
||||
|
||||
@callback
|
||||
def _handle_time_sensitive_update(self, time_service: TibberPricesTimeService) -> None:
|
||||
"""
|
||||
|
|
@ -85,7 +97,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
|
|||
state_getters = {
|
||||
"peak_price_period": self._peak_price_state,
|
||||
"best_price_period": self._best_price_state,
|
||||
"connection": lambda: True if self.coordinator.data else None,
|
||||
"connection": lambda: get_connection_state(self.coordinator),
|
||||
"tomorrow_data_available": self._tomorrow_data_available_state,
|
||||
"has_ventilation_system": self._has_ventilation_system_state,
|
||||
"realtime_consumption_enabled": self._realtime_consumption_enabled_state,
|
||||
|
|
@ -123,8 +135,16 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
|
|||
|
||||
def _tomorrow_data_available_state(self) -> bool | None:
|
||||
"""Return True if tomorrow's data is fully available, False if not, None if unknown."""
|
||||
# Auth errors: Cannot reliably check - return unknown
|
||||
# User must fix auth via reauth flow before we can determine tomorrow data availability
|
||||
if isinstance(self.coordinator.last_exception, ConfigEntryAuthFailed):
|
||||
return None
|
||||
|
||||
# No data: unknown state (initializing or error)
|
||||
if not self.coordinator.data:
|
||||
return None
|
||||
|
||||
# Check tomorrow data availability (normal operation)
|
||||
price_info = self.coordinator.data.get("priceInfo", {})
|
||||
tomorrow_prices = price_info.get("tomorrow", [])
|
||||
interval_count = len(tomorrow_prices)
|
||||
|
|
|
|||
385
tests/test_lifecycle_state.py
Normal file
385
tests/test_lifecycle_state.py
Normal file
|
|
@ -0,0 +1,385 @@
|
|||
"""
|
||||
Unit tests for lifecycle state determination.
|
||||
|
||||
Tests the get_lifecycle_state() method which determines the current
|
||||
data lifecycle state shown to users.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import Mock
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import pytest
|
||||
|
||||
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
|
||||
FRESH_DATA_THRESHOLD_MINUTES,
|
||||
TibberPricesLifecycleCalculator,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_fresh() -> None:
|
||||
"""
|
||||
Test lifecycle state is 'fresh' when data is recent.
|
||||
|
||||
Scenario: Last API fetch was 3 minutes ago, before 13:00 (no tomorrow search)
|
||||
Expected: State is 'fresh' (< 5 minutes threshold)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
current_time = datetime(2025, 11, 22, 10, 30, 0, tzinfo=ZoneInfo("Europe/Oslo")) # 10:30 (before 13:00)
|
||||
last_update = current_time - timedelta(minutes=3)
|
||||
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt # Need for midnight check
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = None
|
||||
coordinator._last_price_update = last_update # noqa: SLF001
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "fresh"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_cached() -> None:
|
||||
"""
|
||||
Test lifecycle state is 'cached' during normal operation.
|
||||
|
||||
Scenario: Last API fetch was 10 minutes ago, no special conditions
|
||||
Expected: State is 'cached' (normal operation)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
last_update = current_time - timedelta(minutes=10)
|
||||
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = None
|
||||
coordinator._last_price_update = last_update # noqa: SLF001
|
||||
|
||||
# Mock get_day_boundaries
|
||||
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
|
||||
|
||||
# Not in tomorrow search mode (before 13:00)
|
||||
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "cached"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_refreshing() -> None:
|
||||
"""
|
||||
Test lifecycle state is 'refreshing' during API call.
|
||||
|
||||
Scenario: Coordinator is currently fetching data
|
||||
Expected: State is 'refreshing' (highest priority)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator._is_fetching = True # noqa: SLF001 - Currently fetching!
|
||||
coordinator.last_exception = None
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "refreshing"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_error() -> None:
|
||||
"""
|
||||
Test lifecycle state is 'error' after failed API call.
|
||||
|
||||
Scenario: Last API call failed, exception is set
|
||||
Expected: State is 'error' (high priority)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = Exception("API Error") # Last call failed!
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "error"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_searching_tomorrow() -> None:
|
||||
"""
|
||||
Test lifecycle state is 'searching_tomorrow' after 13:00 without tomorrow data.
|
||||
|
||||
Scenario: Current time is 15:00, tomorrow data is missing
|
||||
Expected: State is 'searching_tomorrow'
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
# 15:00 (after 13:00 tomorrow check hour)
|
||||
current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
last_update = current_time - timedelta(minutes=10)
|
||||
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = None
|
||||
coordinator._last_price_update = last_update # noqa: SLF001
|
||||
|
||||
# Mock get_day_boundaries
|
||||
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
|
||||
|
||||
# Tomorrow data is missing
|
||||
coordinator._needs_tomorrow_data.return_value = True # noqa: SLF001
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "searching_tomorrow"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_turnover_pending() -> None:
|
||||
"""
|
||||
Test lifecycle state is 'turnover_pending' shortly before midnight.
|
||||
|
||||
Scenario: Current time is 23:57 (3 minutes before midnight)
|
||||
Expected: State is 'turnover_pending' (< 5 minutes threshold)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
# 23:57 (3 minutes before midnight)
|
||||
current_time = datetime(2025, 11, 22, 23, 57, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
last_update = current_time - timedelta(minutes=10)
|
||||
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = None
|
||||
coordinator._last_price_update = last_update # noqa: SLF001
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "turnover_pending"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_priority_error_over_turnover() -> None:
|
||||
"""
|
||||
Test that 'error' state has higher priority than 'turnover_pending'.
|
||||
|
||||
Scenario: Error occurred + approaching midnight
|
||||
Expected: State is 'error' (not turnover_pending)
|
||||
|
||||
Priority: error (2) > turnover_pending (3)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
# 23:58 (2 minutes before midnight) BUT error occurred
|
||||
current_time = datetime(2025, 11, 22, 23, 58, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = Exception("API Error") # Error has priority!
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "error"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_priority_turnover_over_searching() -> None:
|
||||
"""
|
||||
Test that 'turnover_pending' has higher priority than 'searching_tomorrow'.
|
||||
|
||||
Scenario: 23:57 (approaching midnight) + after 13:00 + tomorrow missing
|
||||
Expected: State is 'turnover_pending' (not searching_tomorrow)
|
||||
|
||||
Priority: turnover_pending (3) > searching_tomorrow (4)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
# 23:57 (3 minutes before midnight) + tomorrow missing
|
||||
current_time = datetime(2025, 11, 22, 23, 57, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = None
|
||||
|
||||
# Mock get_day_boundaries
|
||||
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
|
||||
|
||||
# Tomorrow data is missing
|
||||
coordinator._needs_tomorrow_data.return_value = True # noqa: SLF001
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "turnover_pending"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_priority_searching_over_fresh() -> None:
|
||||
"""
|
||||
Test that 'searching_tomorrow' has higher priority than 'fresh'.
|
||||
|
||||
Scenario: 15:00 (after 13:00) + tomorrow missing + data just fetched (2 min ago)
|
||||
Expected: State is 'searching_tomorrow' (not fresh)
|
||||
|
||||
Priority: searching_tomorrow (4) > fresh (5)
|
||||
|
||||
This prevents state flickering during search phase:
|
||||
- Without priority: searching_tomorrow → fresh (5min) → searching_tomorrow → fresh (5min)...
|
||||
- With priority: searching_tomorrow (stable until tomorrow data arrives)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
# 15:00 (after 13:00 tomorrow check hour)
|
||||
current_time = datetime(2025, 11, 22, 15, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
last_update = current_time - timedelta(minutes=2) # Fresh data (< 5 min)
|
||||
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = None
|
||||
coordinator._last_price_update = last_update # noqa: SLF001 - Data is fresh!
|
||||
|
||||
# Mock get_day_boundaries
|
||||
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
|
||||
|
||||
# Tomorrow data is missing
|
||||
coordinator._needs_tomorrow_data.return_value = True # noqa: SLF001
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
# Should be searching_tomorrow (not fresh) to avoid flickering
|
||||
assert state == "searching_tomorrow"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_priority_turnover_over_fresh() -> None:
|
||||
"""
|
||||
Test that 'turnover_pending' has higher priority than 'fresh'.
|
||||
|
||||
Scenario: 23:57 (approaching midnight) + data just fetched (2 min ago)
|
||||
Expected: State is 'turnover_pending' (not fresh)
|
||||
|
||||
Priority: turnover_pending (3) > fresh (5)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
# 23:57 (3 minutes before midnight)
|
||||
current_time = datetime(2025, 11, 22, 23, 57, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
last_update = current_time - timedelta(minutes=2) # Fresh data (< 5 min)
|
||||
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = None
|
||||
coordinator._last_price_update = last_update # noqa: SLF001 - Data is fresh!
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "turnover_pending"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_priority_refreshing_over_all() -> None:
|
||||
"""
|
||||
Test that 'refreshing' state has highest priority.
|
||||
|
||||
Scenario: Currently fetching + error + approaching midnight
|
||||
Expected: State is 'refreshing' (checked first)
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
# 23:58 (approaching midnight) + error + refreshing
|
||||
current_time = datetime(2025, 11, 22, 23, 58, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = True # noqa: SLF001 - Currently fetching!
|
||||
coordinator.last_exception = Exception("Previous error")
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
assert state == "refreshing"
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_lifecycle_state_exact_threshold_boundaries() -> None:
|
||||
"""
|
||||
Test lifecycle state exactly at threshold boundaries.
|
||||
|
||||
Scenario 1: Exactly 5 minutes old → should be 'cached' (not fresh)
|
||||
Scenario 2: Exactly 300 seconds to midnight → should be 'turnover_pending'
|
||||
"""
|
||||
coordinator = Mock()
|
||||
coordinator.time = Mock()
|
||||
|
||||
# Test 1: Exactly 5 minutes old (boundary case)
|
||||
current_time = datetime(2025, 11, 22, 14, 30, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
last_update = current_time - timedelta(minutes=FRESH_DATA_THRESHOLD_MINUTES)
|
||||
|
||||
coordinator.time.now.return_value = current_time
|
||||
coordinator.time.as_local.side_effect = lambda dt: dt
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator.last_exception = None
|
||||
coordinator._last_price_update = last_update # noqa: SLF001
|
||||
|
||||
today_midnight = datetime(2025, 11, 22, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
tomorrow_midnight = datetime(2025, 11, 23, 0, 0, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.get_day_boundaries.return_value = (today_midnight, tomorrow_midnight)
|
||||
coordinator._needs_tomorrow_data.return_value = False # noqa: SLF001
|
||||
|
||||
calculator = TibberPricesLifecycleCalculator(coordinator)
|
||||
state = calculator.get_lifecycle_state()
|
||||
|
||||
# At exactly 5 minutes, threshold is <= 5 min, so should still be fresh
|
||||
assert state == "fresh"
|
||||
|
||||
# Test 2: Exactly at turnover threshold (5 minutes before midnight)
|
||||
current_time_turnover = datetime(2025, 11, 22, 23, 55, 0, tzinfo=ZoneInfo("Europe/Oslo"))
|
||||
coordinator.time.now.return_value = current_time_turnover
|
||||
last_update_turnover = current_time_turnover - timedelta(minutes=10)
|
||||
coordinator._last_price_update = last_update_turnover # noqa: SLF001
|
||||
|
||||
calculator2 = TibberPricesLifecycleCalculator(coordinator)
|
||||
state2 = calculator2.get_lifecycle_state()
|
||||
|
||||
# Exactly 5 minutes (300 seconds) to midnight → should be turnover_pending
|
||||
assert state2 == "turnover_pending"
|
||||
435
tests/test_sensor_consistency.py
Normal file
435
tests/test_sensor_consistency.py
Normal file
|
|
@ -0,0 +1,435 @@
|
|||
"""Tests for sensor state consistency between connection, tomorrow_data_available, and lifecycle_status."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from typing import TYPE_CHECKING
|
||||
from unittest.mock import Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from custom_components.tibber_prices.binary_sensor.core import (
|
||||
TibberPricesBinarySensor,
|
||||
)
|
||||
from custom_components.tibber_prices.coordinator.core import (
|
||||
TibberPricesDataUpdateCoordinator,
|
||||
get_connection_state,
|
||||
)
|
||||
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
|
||||
TibberPricesLifecycleCalculator,
|
||||
)
|
||||
from homeassistant.components.binary_sensor import BinarySensorEntityDescription
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.helpers.update_coordinator import UpdateFailed
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from unittest.mock import Mock as MockType
|
||||
|
||||
|
||||
def create_mock_coordinator() -> Mock:
|
||||
"""
|
||||
Create a properly mocked coordinator for entity initialization.
|
||||
|
||||
Includes all attributes required by TibberPricesEntity.__init__:
|
||||
- hass.config.language (for translations)
|
||||
- config_entry.data, .unique_id, .entry_id (for device info)
|
||||
- get_user_profile() (for home information)
|
||||
"""
|
||||
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
||||
coordinator.data = None
|
||||
coordinator.last_exception = None
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
|
||||
# Mock hass for language configuration
|
||||
coordinator.hass = Mock()
|
||||
coordinator.hass.config.language = "en"
|
||||
|
||||
# Mock config_entry for entity initialization
|
||||
coordinator.config_entry = Mock()
|
||||
coordinator.config_entry.data = {}
|
||||
coordinator.config_entry.entry_id = "test_entry_id"
|
||||
coordinator.config_entry.unique_id = "test_home_id"
|
||||
|
||||
# Mock user profile method
|
||||
coordinator.get_user_profile.return_value = {
|
||||
"home": {
|
||||
"appNickname": "Test Home",
|
||||
"type": "APARTMENT",
|
||||
}
|
||||
}
|
||||
|
||||
return coordinator
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_coordinator() -> MockType:
|
||||
"""Fixture providing a properly mocked coordinator."""
|
||||
return create_mock_coordinator()
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Connection State Tests (get_connection_state helper)
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def test_connection_state_auth_failed(mock_coordinator: MockType) -> None:
|
||||
"""Test connection state when auth fails - should be False (disconnected)."""
|
||||
mock_coordinator.data = {"priceInfo": {"today": []}} # Cached data exists
|
||||
mock_coordinator.last_exception = ConfigEntryAuthFailed("Invalid token")
|
||||
|
||||
# Auth failure = definitively disconnected, even with cached data
|
||||
assert get_connection_state(mock_coordinator) is False
|
||||
|
||||
|
||||
def test_connection_state_api_error_with_cache(mock_coordinator: MockType) -> None:
|
||||
"""Test connection state when API errors but cache available - should be True (using cache)."""
|
||||
mock_coordinator.data = {"priceInfo": {"today": []}} # Cached data exists
|
||||
mock_coordinator.last_exception = UpdateFailed("API timeout")
|
||||
|
||||
# Other errors with cache = considered connected (degraded operation)
|
||||
assert get_connection_state(mock_coordinator) is True
|
||||
|
||||
|
||||
def test_connection_state_api_error_no_cache(mock_coordinator: MockType) -> None:
|
||||
"""Test connection state when API errors and no cache - should be None (unknown)."""
|
||||
mock_coordinator.data = None # No data
|
||||
mock_coordinator.last_exception = UpdateFailed("API timeout")
|
||||
|
||||
# No data and error = unknown state
|
||||
assert get_connection_state(mock_coordinator) is None
|
||||
|
||||
|
||||
def test_connection_state_normal_operation(mock_coordinator: MockType) -> None:
|
||||
"""Test connection state during normal operation - should be True (connected)."""
|
||||
mock_coordinator.data = {"priceInfo": {"today": []}}
|
||||
mock_coordinator.last_exception = None
|
||||
|
||||
# Normal operation with data = connected
|
||||
assert get_connection_state(mock_coordinator) is True
|
||||
|
||||
|
||||
def test_connection_state_initializing(mock_coordinator: MockType) -> None:
|
||||
"""Test connection state when initializing - should be None (unknown)."""
|
||||
mock_coordinator.data = None
|
||||
mock_coordinator.last_exception = None
|
||||
|
||||
# No data, no error = initializing (unknown)
|
||||
assert get_connection_state(mock_coordinator) is None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Sensor Consistency Tests - Auth Error Scenario
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def test_sensor_consistency_auth_error() -> None:
|
||||
"""Test all 3 sensors are consistent when auth fails."""
|
||||
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
||||
coordinator.data = {"priceInfo": {"today": [], "tomorrow": []}} # Cached data
|
||||
coordinator.last_exception = ConfigEntryAuthFailed("Invalid token")
|
||||
coordinator.time = Mock()
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
|
||||
# Connection: Should be False (disconnected)
|
||||
connection_state = get_connection_state(coordinator)
|
||||
assert connection_state is False, "Connection should be off when auth fails"
|
||||
|
||||
# Lifecycle: Should be "error"
|
||||
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
||||
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
||||
assert lifecycle_state == "error", "Lifecycle should be 'error' when auth fails"
|
||||
|
||||
|
||||
def test_sensor_consistency_api_error_with_cache() -> None:
|
||||
"""Test all 3 sensors are consistent when API errors but cache available."""
|
||||
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
||||
coordinator.data = {"priceInfo": {"today": [], "tomorrow": [{"startsAt": "2025-11-23T00:00:00+01:00"}] * 96}}
|
||||
coordinator.last_exception = UpdateFailed("API timeout")
|
||||
coordinator.time = Mock()
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001
|
||||
|
||||
# Connection: Should be True (using cache)
|
||||
connection_state = get_connection_state(coordinator)
|
||||
assert connection_state is True, "Connection should be on when using cache"
|
||||
|
||||
# Lifecycle: Should be "error" (last fetch failed)
|
||||
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
||||
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
||||
assert lifecycle_state == "error", "Lifecycle should be 'error' when last fetch failed"
|
||||
|
||||
|
||||
def test_sensor_consistency_normal_operation() -> None:
|
||||
"""Test all 3 sensors are consistent during normal operation."""
|
||||
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
||||
coordinator.data = {"priceInfo": {"today": [], "tomorrow": []}}
|
||||
coordinator.last_exception = None
|
||||
coordinator.time = Mock()
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001
|
||||
|
||||
# Mock time methods for lifecycle calculator
|
||||
now = datetime(2025, 11, 22, 10, 15, 0, tzinfo=UTC)
|
||||
coordinator.time.now.return_value = now
|
||||
coordinator.time.as_local.return_value = now
|
||||
|
||||
# Connection: Should be True
|
||||
connection_state = get_connection_state(coordinator)
|
||||
assert connection_state is True, "Connection should be on during normal operation"
|
||||
|
||||
# Lifecycle: Should be "cached" (not within 5min of fetch)
|
||||
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
||||
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
||||
assert lifecycle_state == "cached", "Lifecycle should be 'cached' during normal operation"
|
||||
|
||||
|
||||
def test_sensor_consistency_refreshing() -> None:
|
||||
"""Test all 3 sensors are consistent when actively fetching."""
|
||||
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
||||
coordinator.data = {"priceInfo": {"today": [], "tomorrow": []}} # Previous data
|
||||
coordinator.last_exception = None
|
||||
coordinator.time = Mock()
|
||||
coordinator._is_fetching = True # noqa: SLF001 - Currently fetching
|
||||
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001
|
||||
|
||||
# Connection: Should be True (has data, no error)
|
||||
connection_state = get_connection_state(coordinator)
|
||||
assert connection_state is True, "Connection should be on when refreshing"
|
||||
|
||||
# Lifecycle: Should be "refreshing"
|
||||
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
||||
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
||||
assert lifecycle_state == "refreshing", "Lifecycle should be 'refreshing' when actively fetching"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Tomorrow Data Available - Auth Error Handling
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def test_tomorrow_data_available_auth_error_returns_none() -> None:
|
||||
"""Test tomorrow_data_available returns None when auth fails (cannot check)."""
|
||||
coordinator = create_mock_coordinator()
|
||||
coordinator.data = {"priceInfo": {"tomorrow": [{"startsAt": "2025-11-23T00:00:00+01:00"}] * 96}} # Full data
|
||||
coordinator.last_exception = ConfigEntryAuthFailed("Invalid token")
|
||||
coordinator.time = Mock()
|
||||
|
||||
description = BinarySensorEntityDescription(
|
||||
key="tomorrow_data_available",
|
||||
name="Tomorrow Data Available",
|
||||
)
|
||||
|
||||
sensor = TibberPricesBinarySensor(coordinator, description)
|
||||
|
||||
# Even with full tomorrow data, should return None when auth fails
|
||||
state = sensor._tomorrow_data_available_state() # noqa: SLF001
|
||||
assert state is None, "Should return None (unknown) when auth fails, even with cached data"
|
||||
|
||||
|
||||
def test_tomorrow_data_available_no_data_returns_none() -> None:
|
||||
"""Test tomorrow_data_available returns None when no coordinator data."""
|
||||
coordinator = create_mock_coordinator()
|
||||
coordinator.data = None # No data
|
||||
coordinator.last_exception = None
|
||||
coordinator.time = Mock()
|
||||
|
||||
description = BinarySensorEntityDescription(
|
||||
key="tomorrow_data_available",
|
||||
name="Tomorrow Data Available",
|
||||
)
|
||||
|
||||
sensor = TibberPricesBinarySensor(coordinator, description)
|
||||
|
||||
state = sensor._tomorrow_data_available_state() # noqa: SLF001
|
||||
assert state is None, "Should return None when no coordinator data"
|
||||
|
||||
|
||||
def test_tomorrow_data_available_normal_operation_full_data() -> None:
|
||||
"""Test tomorrow_data_available returns True when tomorrow data is complete."""
|
||||
coordinator = create_mock_coordinator()
|
||||
coordinator.data = {"priceInfo": {"tomorrow": [{"startsAt": "2025-11-23T00:00:00+01:00"}] * 96}}
|
||||
coordinator.last_exception = None
|
||||
|
||||
# Mock time service for expected intervals calculation
|
||||
time_service = Mock()
|
||||
time_service.get_local_date.return_value = datetime(2025, 11, 23, tzinfo=UTC).date()
|
||||
time_service.get_expected_intervals_for_day.return_value = 96 # Standard day
|
||||
coordinator.time = time_service
|
||||
|
||||
description = BinarySensorEntityDescription(
|
||||
key="tomorrow_data_available",
|
||||
name="Tomorrow Data Available",
|
||||
)
|
||||
|
||||
sensor = TibberPricesBinarySensor(coordinator, description)
|
||||
|
||||
state = sensor._tomorrow_data_available_state() # noqa: SLF001
|
||||
assert state is True, "Should return True when tomorrow data is complete"
|
||||
|
||||
|
||||
def test_tomorrow_data_available_normal_operation_missing_data() -> None:
|
||||
"""Test tomorrow_data_available returns False when tomorrow data is missing."""
|
||||
coordinator = create_mock_coordinator()
|
||||
coordinator.data = {"priceInfo": {"tomorrow": []}} # No tomorrow data
|
||||
coordinator.last_exception = None
|
||||
|
||||
time_service = Mock()
|
||||
time_service.get_local_date.return_value = datetime(2025, 11, 23, tzinfo=UTC).date()
|
||||
time_service.get_expected_intervals_for_day.return_value = 96
|
||||
coordinator.time = time_service
|
||||
|
||||
description = BinarySensorEntityDescription(
|
||||
key="tomorrow_data_available",
|
||||
name="Tomorrow Data Available",
|
||||
)
|
||||
|
||||
sensor = TibberPricesBinarySensor(coordinator, description)
|
||||
|
||||
state = sensor._tomorrow_data_available_state() # noqa: SLF001
|
||||
assert state is False, "Should return False when tomorrow data is missing"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Integration Tests - Combined Sensor States
|
||||
# =============================================================================
|
||||
|
||||
|
||||
def test_combined_states_auth_error_scenario() -> None:
|
||||
"""
|
||||
Integration test: Verify all 3 sensors show consistent states during auth error.
|
||||
|
||||
Scenario: API returns 401 Unauthorized, cached data exists
|
||||
Expected:
|
||||
- connection: False (off)
|
||||
- tomorrow_data_available: None (unknown)
|
||||
- lifecycle_status: "error"
|
||||
"""
|
||||
# Setup coordinator with auth error state
|
||||
coordinator = create_mock_coordinator()
|
||||
coordinator.data = {
|
||||
"priceInfo": {
|
||||
"today": [{"startsAt": "2025-11-22T00:00:00+01:00"}] * 96,
|
||||
"tomorrow": [{"startsAt": "2025-11-23T00:00:00+01:00"}] * 96,
|
||||
}
|
||||
}
|
||||
coordinator.last_exception = ConfigEntryAuthFailed("Invalid access token")
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
|
||||
time_service = Mock()
|
||||
time_service.get_local_date.return_value = datetime(2025, 11, 23, tzinfo=UTC).date()
|
||||
time_service.get_expected_intervals_for_day.return_value = 96
|
||||
coordinator.time = time_service
|
||||
|
||||
# Test 1: Connection state
|
||||
connection_state = get_connection_state(coordinator)
|
||||
assert connection_state is False, "Connection must be False on auth error"
|
||||
|
||||
# Test 2: Tomorrow data available state
|
||||
tomorrow_desc = BinarySensorEntityDescription(
|
||||
key="tomorrow_data_available",
|
||||
name="Tomorrow Data Available",
|
||||
)
|
||||
tomorrow_sensor = TibberPricesBinarySensor(coordinator, tomorrow_desc)
|
||||
tomorrow_state = tomorrow_sensor._tomorrow_data_available_state() # noqa: SLF001
|
||||
assert tomorrow_state is None, "Tomorrow data must be None (unknown) on auth error"
|
||||
|
||||
# Test 3: Lifecycle state
|
||||
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
||||
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
||||
assert lifecycle_state == "error", "Lifecycle must be 'error' on auth error"
|
||||
|
||||
|
||||
def test_combined_states_api_error_with_cache_scenario() -> None:
|
||||
"""
|
||||
Integration test: Verify all 3 sensors show consistent states during API error with cache.
|
||||
|
||||
Scenario: API times out, but cached data available
|
||||
Expected:
|
||||
- connection: True (on - using cache)
|
||||
- tomorrow_data_available: True/False (checks cached data)
|
||||
- lifecycle_status: "error" (last fetch failed)
|
||||
"""
|
||||
# Setup coordinator with API error but cache available
|
||||
coordinator = create_mock_coordinator()
|
||||
coordinator.data = {
|
||||
"priceInfo": {
|
||||
"today": [{"startsAt": "2025-11-22T00:00:00+01:00"}] * 96,
|
||||
"tomorrow": [{"startsAt": "2025-11-23T00:00:00+01:00"}] * 96,
|
||||
}
|
||||
}
|
||||
coordinator.last_exception = UpdateFailed("API timeout after 30s")
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001
|
||||
|
||||
time_service = Mock()
|
||||
time_service.get_local_date.return_value = datetime(2025, 11, 23, tzinfo=UTC).date()
|
||||
time_service.get_expected_intervals_for_day.return_value = 96
|
||||
coordinator.time = time_service
|
||||
|
||||
# Test 1: Connection state
|
||||
connection_state = get_connection_state(coordinator)
|
||||
assert connection_state is True, "Connection must be True when using cache"
|
||||
|
||||
# Test 2: Tomorrow data available state
|
||||
tomorrow_desc = BinarySensorEntityDescription(
|
||||
key="tomorrow_data_available",
|
||||
name="Tomorrow Data Available",
|
||||
)
|
||||
tomorrow_sensor = TibberPricesBinarySensor(coordinator, tomorrow_desc)
|
||||
tomorrow_state = tomorrow_sensor._tomorrow_data_available_state() # noqa: SLF001
|
||||
assert tomorrow_state is True, "Tomorrow data should check cached data normally"
|
||||
|
||||
# Test 3: Lifecycle state
|
||||
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
||||
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
||||
assert lifecycle_state == "error", "Lifecycle must be 'error' when last fetch failed"
|
||||
|
||||
|
||||
def test_combined_states_normal_operation_scenario() -> None:
|
||||
"""
|
||||
Integration test: Verify all 3 sensors show consistent states during normal operation.
|
||||
|
||||
Scenario: No errors, data available
|
||||
Expected:
|
||||
- connection: True (on)
|
||||
- tomorrow_data_available: True/False (checks data)
|
||||
- lifecycle_status: "cached" or "fresh"
|
||||
"""
|
||||
# Setup coordinator in normal operation
|
||||
coordinator = create_mock_coordinator()
|
||||
coordinator.data = {
|
||||
"priceInfo": {
|
||||
"today": [{"startsAt": "2025-11-22T00:00:00+01:00"}] * 96,
|
||||
"tomorrow": [{"startsAt": "2025-11-23T00:00:00+01:00"}] * 96,
|
||||
}
|
||||
}
|
||||
coordinator.last_exception = None
|
||||
coordinator._is_fetching = False # noqa: SLF001
|
||||
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001 - 10 minutes ago
|
||||
|
||||
# Mock time (10 minutes after last update = "cached" state)
|
||||
now = datetime(2025, 11, 22, 10, 10, 0, tzinfo=UTC)
|
||||
time_service = Mock()
|
||||
time_service.now.return_value = now
|
||||
time_service.as_local.return_value = now
|
||||
time_service.get_local_date.return_value = datetime(2025, 11, 23, tzinfo=UTC).date()
|
||||
time_service.get_expected_intervals_for_day.return_value = 96
|
||||
coordinator.time = time_service
|
||||
|
||||
# Test 1: Connection state
|
||||
connection_state = get_connection_state(coordinator)
|
||||
assert connection_state is True, "Connection must be True during normal operation"
|
||||
|
||||
# Test 2: Tomorrow data available state
|
||||
tomorrow_desc = BinarySensorEntityDescription(
|
||||
key="tomorrow_data_available",
|
||||
name="Tomorrow Data Available",
|
||||
)
|
||||
tomorrow_sensor = TibberPricesBinarySensor(coordinator, tomorrow_desc)
|
||||
tomorrow_state = tomorrow_sensor._tomorrow_data_available_state() # noqa: SLF001
|
||||
assert tomorrow_state is True, "Tomorrow data should be available"
|
||||
|
||||
# Test 3: Lifecycle state
|
||||
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
||||
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
||||
assert lifecycle_state == "cached", "Lifecycle should be 'cached' (>5min since fetch)"
|
||||
Loading…
Reference in a new issue