mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-29 21:03:40 +00:00
Changed from centralized main+subentry coordinator pattern to independent
coordinators per home. Each config entry now manages its own home data
with its own API client and access token.
Architecture changes:
- API Client: async_get_price_info() changed from home_ids: set[str] to home_id: str
* Removed GraphQL alias pattern (home0, home1, ...)
* Single-home query structure without aliasing
* Simplified response parsing (viewer.home instead of viewer.home0)
- Coordinator: Removed main/subentry distinction
* Deleted is_main_entry() and _has_existing_main_coordinator()
* Each coordinator fetches its own data independently
* Removed _find_main_coordinator() and _get_configured_home_ids()
* Simplified _async_update_data() - no subentry logic
* Added _home_id instance variable from config_entry.data
- __init__.py: New _get_access_token() helper
* Handles token retrieval for both parent and subentries
* Subentries find parent entry to get shared access token
* Creates single API client instance per coordinator
- Data structures: Flat single-home format
* Old: {"homes": {home_id: {"price_info": [...]}}}
* New: {"home_id": str, "price_info": [...], "currency": str}
* Attribute name: "periods" → "pricePeriods" (consistent with priceInfo)
- helpers.py: Removed get_configured_home_ids() (no longer needed)
* parse_all_timestamps() updated for single-home structure
Impact: Each home operates independently with its own lifecycle tracking,
caching, and period calculations. Simpler architecture, easier debugging,
better isolation between homes.
464 lines
19 KiB
Python
464 lines
19 KiB
Python
"""Tests for sensor state consistency between connection, tomorrow_data_available, and lifecycle_status."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import UTC, datetime
|
|
from typing import TYPE_CHECKING
|
|
from unittest.mock import Mock
|
|
|
|
import pytest
|
|
|
|
from custom_components.tibber_prices.binary_sensor.core import (
|
|
TibberPricesBinarySensor,
|
|
)
|
|
from custom_components.tibber_prices.coordinator.core import (
|
|
TibberPricesDataUpdateCoordinator,
|
|
get_connection_state,
|
|
)
|
|
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
|
|
TibberPricesLifecycleCalculator,
|
|
)
|
|
from homeassistant.components.binary_sensor import BinarySensorEntityDescription
|
|
from homeassistant.exceptions import ConfigEntryAuthFailed
|
|
from homeassistant.helpers.update_coordinator import UpdateFailed
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
if TYPE_CHECKING:
|
|
from unittest.mock import Mock as MockType
|
|
|
|
|
|
def create_mock_coordinator() -> Mock:
|
|
"""
|
|
Create a properly mocked coordinator for entity initialization.
|
|
|
|
Includes all attributes required by TibberPricesEntity.__init__:
|
|
- hass.config.language (for translations)
|
|
- config_entry.data, .unique_id, .entry_id (for device info)
|
|
- get_user_profile() (for home information)
|
|
"""
|
|
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
|
coordinator.data = None
|
|
coordinator.last_exception = None
|
|
coordinator._is_fetching = False # noqa: SLF001
|
|
|
|
# Mock hass for language configuration
|
|
coordinator.hass = Mock()
|
|
coordinator.hass.config.language = "en"
|
|
|
|
# Mock config_entry for entity initialization
|
|
coordinator.config_entry = Mock()
|
|
coordinator.config_entry.data = {}
|
|
coordinator.config_entry.entry_id = "test_entry_id"
|
|
coordinator.config_entry.unique_id = "test_home_id"
|
|
|
|
# Mock user profile method
|
|
coordinator.get_user_profile.return_value = {
|
|
"home": {
|
|
"appNickname": "Test Home",
|
|
"type": "APARTMENT",
|
|
}
|
|
}
|
|
|
|
return coordinator
|
|
|
|
|
|
def create_price_intervals(day_offset: int = 0) -> list[dict]:
|
|
"""Create 96 mock price intervals (quarter-hourly for one day)."""
|
|
# Use CURRENT date so tests work regardless of when they run
|
|
now_local = dt_util.now()
|
|
base_date = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
intervals = []
|
|
for i in range(96):
|
|
interval_time = base_date.replace(day=base_date.day + day_offset, hour=i // 4, minute=(i % 4) * 15)
|
|
intervals.append(
|
|
{
|
|
"startsAt": interval_time.isoformat(),
|
|
"total": 20.0 + (i % 10),
|
|
"energy": 18.0 + (i % 10),
|
|
"tax": 2.0,
|
|
"level": "NORMAL",
|
|
}
|
|
)
|
|
return intervals
|
|
|
|
|
|
def create_coordinator_data(*, today: bool = True, tomorrow: bool = False) -> dict:
|
|
"""
|
|
Create coordinator data in the new flat-list format.
|
|
|
|
Args:
|
|
today: Include today's 96 intervals
|
|
tomorrow: Include tomorrow's 96 intervals
|
|
|
|
Returns:
|
|
Dict with flat priceInfo list: {"priceInfo": [...]}
|
|
|
|
"""
|
|
all_intervals = []
|
|
if today:
|
|
all_intervals.extend(create_price_intervals(0)) # Today (offset 0)
|
|
if tomorrow:
|
|
all_intervals.extend(create_price_intervals(1)) # Tomorrow (offset 1)
|
|
|
|
return {"priceInfo": all_intervals}
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_coordinator() -> MockType:
|
|
"""Fixture providing a properly mocked coordinator."""
|
|
return create_mock_coordinator()
|
|
|
|
|
|
# =============================================================================
|
|
# Connection State Tests (get_connection_state helper)
|
|
# =============================================================================
|
|
|
|
|
|
def test_connection_state_auth_failed(mock_coordinator: MockType) -> None:
|
|
"""Test connection state when auth fails - should be False (disconnected)."""
|
|
mock_coordinator.data = create_coordinator_data(today=True, tomorrow=False)
|
|
mock_coordinator.last_exception = ConfigEntryAuthFailed("Invalid token")
|
|
|
|
# Auth failure = definitively disconnected, even with cached data
|
|
assert get_connection_state(mock_coordinator) is False
|
|
|
|
|
|
def test_connection_state_api_error_with_cache(mock_coordinator: MockType) -> None:
|
|
"""Test connection state when API errors but cache available - should be True (using cache)."""
|
|
mock_coordinator.data = create_coordinator_data(today=True, tomorrow=False)
|
|
mock_coordinator.last_exception = UpdateFailed("API timeout")
|
|
|
|
# Other errors with cache = considered connected (degraded operation)
|
|
assert get_connection_state(mock_coordinator) is True
|
|
|
|
|
|
def test_connection_state_api_error_no_cache(mock_coordinator: MockType) -> None:
|
|
"""Test connection state when API errors and no cache - should be None (unknown)."""
|
|
mock_coordinator.data = None # No data
|
|
mock_coordinator.last_exception = UpdateFailed("API timeout")
|
|
|
|
# No data and error = unknown state
|
|
assert get_connection_state(mock_coordinator) is None
|
|
|
|
|
|
def test_connection_state_normal_operation(mock_coordinator: MockType) -> None:
|
|
"""Test connection state during normal operation - should be True (connected)."""
|
|
mock_coordinator.data = create_coordinator_data(today=True, tomorrow=False)
|
|
mock_coordinator.last_exception = None
|
|
|
|
# Normal operation with data = connected
|
|
assert get_connection_state(mock_coordinator) is True
|
|
|
|
|
|
def test_connection_state_initializing(mock_coordinator: MockType) -> None:
|
|
"""Test connection state when initializing - should be None (unknown)."""
|
|
mock_coordinator.data = None
|
|
mock_coordinator.last_exception = None
|
|
|
|
# No data, no error = initializing (unknown)
|
|
assert get_connection_state(mock_coordinator) is None
|
|
|
|
|
|
# =============================================================================
|
|
# Sensor Consistency Tests - Auth Error Scenario
|
|
# =============================================================================
|
|
|
|
|
|
def test_sensor_consistency_auth_error() -> None:
|
|
"""Test all 3 sensors are consistent when auth fails."""
|
|
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
|
coordinator.data = create_coordinator_data(today=True, tomorrow=False)
|
|
coordinator.last_exception = ConfigEntryAuthFailed("Invalid token")
|
|
coordinator.time = Mock()
|
|
coordinator._is_fetching = False # noqa: SLF001
|
|
|
|
# Connection: Should be False (disconnected)
|
|
connection_state = get_connection_state(coordinator)
|
|
assert connection_state is False, "Connection should be off when auth fails"
|
|
|
|
# Lifecycle: Should be "error"
|
|
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
|
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
|
assert lifecycle_state == "error", "Lifecycle should be 'error' when auth fails"
|
|
|
|
|
|
def test_sensor_consistency_api_error_with_cache() -> None:
|
|
"""Test all 3 sensors are consistent when API errors but cache available."""
|
|
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
|
coordinator.data = create_coordinator_data(today=True, tomorrow=True)
|
|
coordinator.last_exception = UpdateFailed("API timeout")
|
|
coordinator.time = Mock()
|
|
coordinator._is_fetching = False # noqa: SLF001
|
|
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001
|
|
|
|
# Connection: Should be True (using cache)
|
|
connection_state = get_connection_state(coordinator)
|
|
assert connection_state is True, "Connection should be on when using cache"
|
|
|
|
# Lifecycle: Should be "error" (last fetch failed)
|
|
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
|
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
|
assert lifecycle_state == "error", "Lifecycle should be 'error' when last fetch failed"
|
|
|
|
|
|
def test_sensor_consistency_normal_operation() -> None:
|
|
"""Test all 3 sensors are consistent during normal operation."""
|
|
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
|
coordinator.data = create_coordinator_data(today=True, tomorrow=False)
|
|
coordinator.last_exception = None
|
|
coordinator.time = Mock()
|
|
coordinator._is_fetching = False # noqa: SLF001
|
|
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001
|
|
|
|
# Mock time methods for lifecycle calculator
|
|
now = datetime(2025, 11, 22, 10, 15, 0, tzinfo=UTC)
|
|
coordinator.time.now.return_value = now
|
|
coordinator.time.as_local.return_value = now
|
|
|
|
# Connection: Should be True
|
|
connection_state = get_connection_state(coordinator)
|
|
assert connection_state is True, "Connection should be on during normal operation"
|
|
|
|
# Lifecycle: Should be "cached" (not within 5min of fetch)
|
|
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
|
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
|
assert lifecycle_state == "cached", "Lifecycle should be 'cached' during normal operation"
|
|
|
|
|
|
def test_sensor_consistency_refreshing() -> None:
|
|
"""Test all 3 sensors are consistent when actively fetching."""
|
|
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
|
|
coordinator.data = create_coordinator_data(today=True, tomorrow=False)
|
|
coordinator.last_exception = None
|
|
coordinator.time = Mock()
|
|
coordinator._is_fetching = True # noqa: SLF001 - Currently fetching
|
|
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001
|
|
|
|
# Connection: Should be True (has data, no error)
|
|
connection_state = get_connection_state(coordinator)
|
|
assert connection_state is True, "Connection should be on when refreshing"
|
|
|
|
# Lifecycle: Should be "refreshing"
|
|
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
|
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
|
assert lifecycle_state == "refreshing", "Lifecycle should be 'refreshing' when actively fetching"
|
|
|
|
|
|
# =============================================================================
|
|
# Tomorrow Data Available - Auth Error Handling
|
|
# =============================================================================
|
|
|
|
|
|
def test_tomorrow_data_available_auth_error_returns_none() -> None:
|
|
"""Test tomorrow_data_available returns None when auth fails (cannot check)."""
|
|
coordinator = create_mock_coordinator()
|
|
coordinator.data = create_coordinator_data(today=False, tomorrow=True)
|
|
coordinator.last_exception = ConfigEntryAuthFailed("Invalid token")
|
|
coordinator.time = Mock()
|
|
|
|
description = BinarySensorEntityDescription(
|
|
key="tomorrow_data_available",
|
|
name="Tomorrow Data Available",
|
|
)
|
|
|
|
sensor = TibberPricesBinarySensor(coordinator, description)
|
|
|
|
# Even with full tomorrow data, should return None when auth fails
|
|
state = sensor._tomorrow_data_available_state() # noqa: SLF001
|
|
assert state is None, "Should return None (unknown) when auth fails, even with cached data"
|
|
|
|
|
|
def test_tomorrow_data_available_no_data_returns_none() -> None:
|
|
"""Test tomorrow_data_available returns None when no coordinator data."""
|
|
coordinator = create_mock_coordinator()
|
|
coordinator.data = None # No data
|
|
coordinator.last_exception = None
|
|
coordinator.time = Mock()
|
|
|
|
description = BinarySensorEntityDescription(
|
|
key="tomorrow_data_available",
|
|
name="Tomorrow Data Available",
|
|
)
|
|
|
|
sensor = TibberPricesBinarySensor(coordinator, description)
|
|
|
|
state = sensor._tomorrow_data_available_state() # noqa: SLF001
|
|
assert state is None, "Should return None when no coordinator data"
|
|
|
|
|
|
def test_tomorrow_data_available_normal_operation_full_data() -> None:
|
|
"""Test tomorrow_data_available returns True when tomorrow data is complete."""
|
|
coordinator = create_mock_coordinator()
|
|
coordinator.data = create_coordinator_data(today=False, tomorrow=True)
|
|
coordinator.last_exception = None
|
|
|
|
# Mock time service for expected intervals calculation
|
|
now_date = dt_util.now().date()
|
|
time_service = Mock()
|
|
time_service.get_local_date.return_value = now_date
|
|
time_service.get_expected_intervals_for_day.return_value = 96 # Standard day
|
|
coordinator.time = time_service
|
|
|
|
description = BinarySensorEntityDescription(
|
|
key="tomorrow_data_available",
|
|
name="Tomorrow Data Available",
|
|
)
|
|
|
|
sensor = TibberPricesBinarySensor(coordinator, description)
|
|
|
|
state = sensor._tomorrow_data_available_state() # noqa: SLF001
|
|
assert state is True, "Should return True when tomorrow data is complete"
|
|
|
|
|
|
def test_tomorrow_data_available_normal_operation_missing_data() -> None:
|
|
"""Test tomorrow_data_available returns False when tomorrow data is missing."""
|
|
coordinator = create_mock_coordinator()
|
|
coordinator.data = create_coordinator_data(today=True, tomorrow=False) # No tomorrow data
|
|
coordinator.last_exception = None
|
|
|
|
time_service = Mock()
|
|
time_service.get_local_date.return_value = datetime(2025, 11, 23, tzinfo=UTC).date()
|
|
time_service.get_expected_intervals_for_day.return_value = 96
|
|
coordinator.time = time_service
|
|
|
|
description = BinarySensorEntityDescription(
|
|
key="tomorrow_data_available",
|
|
name="Tomorrow Data Available",
|
|
)
|
|
|
|
sensor = TibberPricesBinarySensor(coordinator, description)
|
|
|
|
state = sensor._tomorrow_data_available_state() # noqa: SLF001
|
|
assert state is False, "Should return False when tomorrow data is missing"
|
|
|
|
|
|
# =============================================================================
|
|
# Integration Tests - Combined Sensor States
|
|
# =============================================================================
|
|
|
|
|
|
def test_combined_states_auth_error_scenario() -> None:
|
|
"""
|
|
Integration test: Verify all 3 sensors show consistent states during auth error.
|
|
|
|
Scenario: API returns 401 Unauthorized, cached data exists
|
|
Expected:
|
|
- connection: False (off)
|
|
- tomorrow_data_available: None (unknown)
|
|
- lifecycle_status: "error"
|
|
"""
|
|
# Setup coordinator with auth error state
|
|
coordinator = create_mock_coordinator()
|
|
coordinator.data = create_coordinator_data(today=True, tomorrow=True)
|
|
coordinator.last_exception = ConfigEntryAuthFailed("Invalid access token")
|
|
coordinator._is_fetching = False # noqa: SLF001
|
|
|
|
time_service = Mock()
|
|
time_service.get_local_date.return_value = datetime(2025, 11, 22, tzinfo=UTC).date() # Today is 22nd
|
|
time_service.get_expected_intervals_for_day.return_value = 96
|
|
coordinator.time = time_service
|
|
|
|
# Test 1: Connection state
|
|
connection_state = get_connection_state(coordinator)
|
|
assert connection_state is False, "Connection must be False on auth error"
|
|
|
|
# Test 2: Tomorrow data available state
|
|
tomorrow_desc = BinarySensorEntityDescription(
|
|
key="tomorrow_data_available",
|
|
name="Tomorrow Data Available",
|
|
)
|
|
tomorrow_sensor = TibberPricesBinarySensor(coordinator, tomorrow_desc)
|
|
tomorrow_state = tomorrow_sensor._tomorrow_data_available_state() # noqa: SLF001
|
|
assert tomorrow_state is None, "Tomorrow data must be None (unknown) on auth error"
|
|
|
|
# Test 3: Lifecycle state
|
|
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
|
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
|
assert lifecycle_state == "error", "Lifecycle must be 'error' on auth error"
|
|
|
|
|
|
def test_combined_states_api_error_with_cache_scenario() -> None:
|
|
"""
|
|
Integration test: Verify all 3 sensors show consistent states during API error with cache.
|
|
|
|
Scenario: API times out, but cached data available
|
|
Expected:
|
|
- connection: True (on - using cache)
|
|
- tomorrow_data_available: True/False (checks cached data)
|
|
- lifecycle_status: "error" (last fetch failed)
|
|
"""
|
|
# Setup coordinator with API error but cache available
|
|
coordinator = create_mock_coordinator()
|
|
coordinator.data = create_coordinator_data(today=True, tomorrow=True)
|
|
coordinator.last_exception = UpdateFailed("API timeout after 30s")
|
|
coordinator._is_fetching = False # noqa: SLF001
|
|
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001
|
|
|
|
time_service = Mock()
|
|
time_service.get_local_date.return_value = datetime(2025, 11, 22, tzinfo=UTC).date() # Today is 22nd
|
|
time_service.get_expected_intervals_for_day.return_value = 96
|
|
coordinator.time = time_service
|
|
|
|
# Test 1: Connection state
|
|
connection_state = get_connection_state(coordinator)
|
|
assert connection_state is True, "Connection must be True when using cache"
|
|
|
|
# Test 2: Tomorrow data available state
|
|
tomorrow_desc = BinarySensorEntityDescription(
|
|
key="tomorrow_data_available",
|
|
name="Tomorrow Data Available",
|
|
)
|
|
tomorrow_sensor = TibberPricesBinarySensor(coordinator, tomorrow_desc)
|
|
tomorrow_state = tomorrow_sensor._tomorrow_data_available_state() # noqa: SLF001
|
|
assert tomorrow_state is True, "Tomorrow data should check cached data normally"
|
|
|
|
# Test 3: Lifecycle state
|
|
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
|
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
|
assert lifecycle_state == "error", "Lifecycle must be 'error' when last fetch failed"
|
|
|
|
|
|
def test_combined_states_normal_operation_scenario() -> None:
|
|
"""
|
|
Integration test: Verify all 3 sensors show consistent states during normal operation.
|
|
|
|
Scenario: No errors, data available
|
|
Expected:
|
|
- connection: True (on)
|
|
- tomorrow_data_available: True/False (checks data)
|
|
- lifecycle_status: "cached" or "fresh"
|
|
"""
|
|
# Setup coordinator in normal operation
|
|
coordinator = create_mock_coordinator()
|
|
coordinator.data = create_coordinator_data(today=True, tomorrow=True)
|
|
coordinator.last_exception = None
|
|
coordinator._is_fetching = False # noqa: SLF001
|
|
coordinator._last_price_update = datetime(2025, 11, 22, 10, 0, 0, tzinfo=UTC) # noqa: SLF001 - 10 minutes ago
|
|
|
|
# Mock time (10 minutes after last update = "cached" state)
|
|
now = datetime(2025, 11, 22, 10, 10, 0, tzinfo=UTC)
|
|
time_service = Mock()
|
|
time_service.now.return_value = now
|
|
time_service.as_local.return_value = now
|
|
time_service.get_local_date.return_value = datetime(2025, 11, 22, tzinfo=UTC).date() # Today is 22nd
|
|
time_service.get_expected_intervals_for_day.return_value = 96
|
|
coordinator.time = time_service
|
|
|
|
# Test 1: Connection state
|
|
connection_state = get_connection_state(coordinator)
|
|
assert connection_state is True, "Connection must be True during normal operation"
|
|
|
|
# Test 2: Tomorrow data available state
|
|
tomorrow_desc = BinarySensorEntityDescription(
|
|
key="tomorrow_data_available",
|
|
name="Tomorrow Data Available",
|
|
)
|
|
tomorrow_sensor = TibberPricesBinarySensor(coordinator, tomorrow_desc)
|
|
tomorrow_state = tomorrow_sensor._tomorrow_data_available_state() # noqa: SLF001
|
|
assert tomorrow_state is True, "Tomorrow data should be available"
|
|
|
|
# Test 3: Lifecycle state
|
|
lifecycle_calc = TibberPricesLifecycleCalculator(coordinator)
|
|
lifecycle_state = lifecycle_calc.get_lifecycle_state()
|
|
assert lifecycle_state == "cached", "Lifecycle should be 'cached' (>5min since fetch)"
|