test: remove obsolete lifecycle callback tests

Removed tests for the lifecycle callback system that was removed in
commit 48d6e25.

Also fixed commit f373c01 which incorrectly added test_lifecycle_tomorrow_update.py
instead of deleting it - this commit properly removes it.

Changes:
- tests/test_chart_data_push_updates.py: Deleted (235 lines)
- tests/test_lifecycle_tomorrow_update.py: Deleted (174 lines)
- tests/test_resource_cleanup.py: Removed lifecycle callback test method

Impact: Test suite now has 343 tests (down from 349). All tests pass.
No functionality affected - only test cleanup.
This commit is contained in:
Julian Pawlowski 2025-11-22 13:04:47 +00:00
parent 2d0febdab3
commit 32857c0cc0
3 changed files with 0 additions and 430 deletions

View file

@ -1,235 +0,0 @@
"""
Test chart_data_export sensor receives push updates from lifecycle changes.
This test verifies that when new price data arrives from the API (lifecycle
state changes to "fresh"), the chart_data_export sensor is immediately refreshed
via push update, not waiting for the next coordinator polling cycle.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from unittest.mock import AsyncMock, Mock
import pytest
from custom_components.tibber_prices.sensor.core import TibberPricesSensor
if TYPE_CHECKING:
from collections.abc import Callable
@pytest.mark.unit
def test_chart_data_export_registers_lifecycle_callback() -> None:
"""
Test that chart_data_export sensor registers for lifecycle push updates.
When chart_data_export sensor is created, it should register a callback
with the coordinator to receive immediate notifications when lifecycle
state changes (e.g., new API data arrives).
"""
# Create mock coordinator with register_lifecycle_callback method
mock_coordinator = Mock()
mock_coordinator.register_lifecycle_callback = Mock(return_value=Mock()) # Returns unregister callable
mock_coordinator.data = {"priceInfo": {}}
mock_coordinator.config_entry = Mock()
mock_coordinator.config_entry.entry_id = "test_entry"
# Create mock entity description for chart_data_export
mock_entity_description = Mock()
mock_entity_description.key = "chart_data_export"
mock_entity_description.translation_key = "chart_data_export"
# Create sensor instance
sensor = TibberPricesSensor(
coordinator=mock_coordinator,
entity_description=mock_entity_description,
)
# Verify lifecycle callback was registered
mock_coordinator.register_lifecycle_callback.assert_called_once()
# Verify the callback is stored for cleanup
assert sensor._lifecycle_remove_listener is not None # noqa: SLF001 - Test accesses internal state
@pytest.mark.unit
def test_data_lifecycle_status_registers_lifecycle_callback() -> None:
"""
Test that data_lifecycle_status sensor also registers for lifecycle push updates.
This is the original behavior - lifecycle sensor should still register.
"""
# Create mock coordinator
mock_coordinator = Mock()
mock_coordinator.register_lifecycle_callback = Mock(return_value=Mock())
mock_coordinator.data = {"priceInfo": {}}
mock_coordinator.config_entry = Mock()
mock_coordinator.config_entry.entry_id = "test_entry"
# Create mock entity description for data_lifecycle_status
mock_entity_description = Mock()
mock_entity_description.key = "data_lifecycle_status"
mock_entity_description.translation_key = "data_lifecycle_status"
# Create sensor instance
sensor = TibberPricesSensor(
coordinator=mock_coordinator,
entity_description=mock_entity_description,
)
# Verify lifecycle callback was registered
mock_coordinator.register_lifecycle_callback.assert_called_once()
# Verify the callback is stored for cleanup
assert sensor._lifecycle_remove_listener is not None # noqa: SLF001 - Test accesses internal state
@pytest.mark.unit
def test_other_sensors_do_not_register_lifecycle_callback() -> None:
"""
Test that other sensors (not lifecycle or chart_data_export) don't register lifecycle callbacks.
Only data_lifecycle_status and chart_data_export should register for push updates.
"""
# Create mock coordinator
mock_coordinator = Mock()
mock_coordinator.register_lifecycle_callback = Mock(return_value=Mock())
mock_coordinator.data = {"priceInfo": {}}
mock_coordinator.config_entry = Mock()
mock_coordinator.config_entry.entry_id = "test_entry"
# Create mock entity description for a regular sensor
mock_entity_description = Mock()
mock_entity_description.key = "current_interval_price"
mock_entity_description.translation_key = "current_interval_price"
# Create sensor instance
sensor = TibberPricesSensor(
coordinator=mock_coordinator,
entity_description=mock_entity_description,
)
# Verify lifecycle callback was NOT registered
mock_coordinator.register_lifecycle_callback.assert_not_called()
# Verify no lifecycle listener is stored
assert sensor._lifecycle_remove_listener is None # noqa: SLF001 - Test accesses internal state
@pytest.mark.unit
@pytest.mark.asyncio
async def test_chart_data_lifecycle_callback_refreshes_data() -> None:
"""
Test that lifecycle callback for chart_data_export triggers data refresh.
When coordinator notifies lifecycle change (e.g., new API data arrives),
the chart_data_export sensor should immediately refresh its data by calling
the chart data service.
"""
# Create mock hass
mock_hass = Mock()
mock_hass.async_create_task = Mock()
# Create mock coordinator
mock_coordinator = Mock()
mock_coordinator.data = {"priceInfo": {}}
mock_coordinator.hass = mock_hass
mock_coordinator.config_entry = Mock()
mock_coordinator.config_entry.entry_id = "test_entry"
# Track registered callbacks
registered_callbacks: list[Callable] = []
def mock_register_callback(callback: Callable) -> Callable:
"""Mock register that stores the callback."""
registered_callbacks.append(callback)
return Mock() # Return unregister callable
mock_coordinator.register_lifecycle_callback = mock_register_callback
# Create mock entity description for chart_data_export
mock_entity_description = Mock()
mock_entity_description.key = "chart_data_export"
mock_entity_description.translation_key = "chart_data_export"
# Create sensor instance
sensor = TibberPricesSensor(
coordinator=mock_coordinator,
entity_description=mock_entity_description,
)
# Assign hass to sensor (normally done by HA)
sensor.hass = mock_hass
# Verify callback was registered
assert len(registered_callbacks) == 1
lifecycle_callback = registered_callbacks[0]
# Mock _refresh_chart_data to avoid actual service call
sensor._refresh_chart_data = AsyncMock() # noqa: SLF001 - Test accesses internal method
# Trigger lifecycle callback (simulating coordinator notification)
lifecycle_callback()
# Verify hass.async_create_task was called (callback schedules async refresh)
mock_hass.async_create_task.assert_called_once()
# Get the task that was scheduled
scheduled_task = mock_hass.async_create_task.call_args[0][0]
# Execute the scheduled task
await scheduled_task
# Verify _refresh_chart_data was called
sensor._refresh_chart_data.assert_called_once() # noqa: SLF001 - Test accesses internal method
@pytest.mark.unit
@pytest.mark.asyncio
async def test_lifecycle_callback_cleanup_on_remove() -> None:
"""
Test that lifecycle callback is properly unregistered when sensor is removed.
When chart_data_export sensor is removed from HA, the lifecycle callback
should be unregistered to prevent memory leaks.
"""
# Create mock hass
mock_hass = Mock()
# Create mock coordinator
mock_coordinator = Mock()
mock_coordinator.data = {"priceInfo": {}}
mock_coordinator.hass = mock_hass
mock_coordinator.config_entry = Mock()
mock_coordinator.config_entry.entry_id = "test_entry"
# Track unregister callable
unregister_mock = Mock()
mock_coordinator.register_lifecycle_callback = Mock(return_value=unregister_mock)
# Create mock entity description for chart_data_export
mock_entity_description = Mock()
mock_entity_description.key = "chart_data_export"
mock_entity_description.translation_key = "chart_data_export"
# Create sensor instance
sensor = TibberPricesSensor(
coordinator=mock_coordinator,
entity_description=mock_entity_description,
)
# Assign hass to sensor
sensor.hass = mock_hass
# Verify callback was registered
assert sensor._lifecycle_remove_listener is not None # noqa: SLF001 - Test accesses internal state
# Remove sensor from hass (trigger cleanup)
await sensor.async_will_remove_from_hass()
# Verify unregister callable was called
unregister_mock.assert_called_once()
# Verify lifecycle listener is cleared
assert sensor._lifecycle_remove_listener is None # noqa: SLF001 - Test accesses internal state

View file

@ -1,174 +0,0 @@
"""
Test lifecycle sensor update after tomorrow data fetch.
This test ensures that when Timer #1 fetches tomorrow data after 13:00,
the lifecycle sensor correctly shows the new data (tomorrow_available=true)
and not stale attributes from before the fetch.
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timedelta
from typing import Any
from unittest.mock import Mock
from zoneinfo import ZoneInfo
import pytest
from custom_components.tibber_prices.coordinator.core import (
TibberPricesDataUpdateCoordinator,
)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_lifecycle_sensor_updates_after_tomorrow_fetch() -> None:
"""
Test that lifecycle sensor shows fresh tomorrow data after Timer #1 fetch.
Scenario:
1. Time is 13:05 (after tomorrow data expected)
2. Coordinator fetches new data with tomorrow prices
3. Lifecycle state changes to "fresh"
4. Lifecycle sensor updates should see NEW coordinator.data (with tomorrow)
Bug fixed: Previously lifecycle callbacks were called BEFORE coordinator.data
was set, causing lifecycle sensor to show tomorrow_available=false even though
tomorrow data was just fetched.
"""
# Create mock hass
mock_hass = Mock()
mock_hass.async_create_task = Mock()
# Setup mock coordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator.hass = mock_hass
coordinator.time = Mock()
current_time = datetime(2025, 11, 22, 13, 5, 0, tzinfo=ZoneInfo("Europe/Oslo"))
coordinator.time.now.return_value = current_time
coordinator.time.as_local.side_effect = lambda dt: dt
# Initial state: no tomorrow data
coordinator.data = {
"priceInfo": {
"today": [{"startsAt": "2025-11-22T00:00:00+01:00", "total": 0.30}],
"tomorrow": [], # Empty - no tomorrow data yet
}
}
coordinator._cached_price_data = coordinator.data["priceInfo"] # noqa: SLF001
coordinator._lifecycle_state = "cached" # noqa: SLF001
coordinator._last_price_update = current_time - timedelta(hours=1) # noqa: SLF001
# Mock lifecycle callbacks list
lifecycle_callbacks = []
coordinator._lifecycle_callbacks = lifecycle_callbacks # noqa: SLF001
coordinator.register_lifecycle_callback = lambda cb: lifecycle_callbacks.append(cb)
# Create a mock sensor that tracks when it's updated
sensor_update_count = {"count": 0, "saw_tomorrow": False}
def mock_sensor_update() -> None:
"""Mock sensor update that checks coordinator.data."""
sensor_update_count["count"] += 1
# Check if sensor sees tomorrow data in coordinator.data
if coordinator.data and coordinator.data["priceInfo"]["tomorrow"]:
sensor_update_count["saw_tomorrow"] = True
# Register mock sensor as lifecycle callback
lifecycle_callbacks.append(mock_sensor_update)
# Simulate data fetch with tomorrow prices (simulates Timer #1 after 13:00)
new_data = {
"priceInfo": {
"today": [{"startsAt": "2025-11-22T00:00:00+01:00", "total": 0.30}],
"tomorrow": [ # NEW tomorrow data
{"startsAt": "2025-11-23T00:00:00+01:00", "total": 0.28},
{"startsAt": "2025-11-23T01:00:00+01:00", "total": 0.27},
],
}
}
# Update coordinator internal state (simulates what _async_update_data does)
coordinator._cached_price_data = new_data["priceInfo"] # noqa: SLF001
coordinator._last_price_update = current_time # noqa: SLF001 - New timestamp
coordinator._lifecycle_state = "fresh" # noqa: SLF001
# CRITICAL: Set coordinator.data BEFORE calling lifecycle callbacks
# This simulates what DataUpdateCoordinator framework does after _async_update_data returns
coordinator.data = new_data
# Simulate the fixed _notify_lifecycle_after_update() behavior
# In the real code, this is scheduled as a task with asyncio.sleep(0)
# to ensure it runs AFTER framework sets coordinator.data
async def simulate_lifecycle_update() -> None:
"""Simulate the fixed lifecycle update behavior."""
# Yield to event loop (simulates asyncio.sleep(0))
await asyncio.sleep(0)
# Now call callbacks - they should see NEW coordinator.data
for callback in lifecycle_callbacks:
callback()
# Run the lifecycle update
await simulate_lifecycle_update()
# Verify sensor was updated
assert sensor_update_count["count"] == 1, "Lifecycle callback should be called once"
# CRITICAL: Verify sensor saw the NEW tomorrow data
assert sensor_update_count["saw_tomorrow"], (
"Lifecycle sensor should see tomorrow data in coordinator.data (not stale data from before fetch)"
)
@pytest.mark.unit
@pytest.mark.asyncio
async def test_lifecycle_callback_guard_before_entity_added() -> None:
"""
Test that lifecycle callback handles being called before entity is added to hass.
This tests the guard in _handle_lifecycle_update_for_chart() that prevents
AttributeError when self.hass is None.
"""
# Create mock hass
mock_hass = Mock()
tasks_created = []
def mock_create_task(coro: Any) -> Mock:
"""Track created tasks."""
tasks_created.append(coro)
return Mock()
mock_hass.async_create_task = mock_create_task
# Create a mock entity that simulates chart_data_export sensor
entity = Mock()
entity.hass = None # Not yet added to Home Assistant
# Mock the _refresh_chart_data method as a Mock (not real coroutine)
# This avoids "coroutine never awaited" warnings in test
entity._refresh_chart_data = Mock(return_value=Mock()) # noqa: SLF001
# Simulate the callback being called before entity is added
# This should NOT crash with AttributeError
def callback() -> None:
"""Simulate _handle_lifecycle_update_for_chart with guard."""
if entity.hass is None:
return # Guard: Don't schedule task if not added yet
entity.hass.async_create_task(entity._refresh_chart_data()) # noqa: SLF001
# Call callback - should not crash
callback()
# Verify NO task was created (entity not added yet)
assert len(tasks_created) == 0, "No task should be created when entity.hass is None"
# Now simulate entity being added to hass
entity.hass = mock_hass
# Call callback again - should now schedule task
callback()
# Verify task was created this time
assert len(tasks_created) == 1, "Task should be created after entity is added to hass"

View file

@ -64,27 +64,6 @@ class TestListenerCleanup:
assert callback not in manager._minute_update_listeners # noqa: SLF001
assert len(manager._minute_update_listeners) == 0 # noqa: SLF001
def test_coordinator_removes_lifecycle_callbacks(self) -> None:
"""Test that lifecycle callbacks can be unregistered."""
# Create mock coordinator
coordinator = object.__new__(TibberPricesDataUpdateCoordinator)
coordinator._lifecycle_callbacks = [] # noqa: SLF001
# Register a callback
callback = Mock()
unregister_fn = coordinator.register_lifecycle_callback(callback)
# Verify callback was registered
assert callback in coordinator._lifecycle_callbacks # noqa: SLF001
assert len(coordinator._lifecycle_callbacks) == 1 # noqa: SLF001
# Unregister callback
unregister_fn()
# Verify callback was removed
assert callback not in coordinator._lifecycle_callbacks # noqa: SLF001
assert len(coordinator._lifecycle_callbacks) == 0 # noqa: SLF001
@pytest.mark.asyncio
async def test_sensor_cleanup_pattern_exists(self) -> None:
"""