fix midnight turnover

This commit is contained in:
Julian Pawlowski 2025-11-02 23:27:44 +00:00
parent ef05929247
commit 1b452b72fb
3 changed files with 286 additions and 5 deletions

View file

@ -271,12 +271,88 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
if last_user_update := stored.get("last_user_update"):
self._last_user_update = dt_util.parse_datetime(last_user_update)
_LOGGER.debug("Cache loaded successfully")
# Validate cache: check if price data is from a previous day
if not self._is_cache_valid():
_LOGGER.info("Cached price data is from a previous day, clearing cache to fetch fresh data")
self._cached_price_data = None
self._last_price_update = None
await self._store_cache()
else:
_LOGGER.debug("Cache loaded successfully")
else:
_LOGGER.debug("No cache found, will fetch fresh data")
except OSError as ex:
_LOGGER.warning("Failed to load cache: %s", ex)
def _is_cache_valid(self) -> bool:
"""
Validate if cached price data is still current.
Returns False if:
- No cached data exists
- Cached data is from a different calendar day (in local timezone)
- Midnight turnover has occurred since cache was saved
"""
if self._cached_price_data is None or self._last_price_update is None:
return False
current_local_date = dt_util.as_local(dt_util.now()).date()
last_update_local_date = dt_util.as_local(self._last_price_update).date()
if current_local_date != last_update_local_date:
_LOGGER.debug(
"Cache date mismatch: cached=%s, current=%s",
last_update_local_date,
current_local_date,
)
return False
return True
def _perform_midnight_turnover(self, price_info: dict[str, Any]) -> dict[str, Any]:
"""
Perform midnight turnover on price data.
Moves: today yesterday, tomorrow today, clears tomorrow.
This handles cases where:
- Server was running through midnight
- Cache is being refreshed and needs proper day rotation
Args:
price_info: The price info dict with 'today', 'tomorrow', 'yesterday' keys
Returns:
Updated price_info with rotated day data
"""
current_local_date = dt_util.as_local(dt_util.now()).date()
# Extract current data
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
# Check if any of today's prices are from the previous day
prices_need_rotation = False
if today_prices:
first_today_price_str = today_prices[0].get("startsAt")
if first_today_price_str:
first_today_price_time = dt_util.parse_datetime(first_today_price_str)
if first_today_price_time:
first_today_price_date = dt_util.as_local(first_today_price_time).date()
prices_need_rotation = first_today_price_date < current_local_date
if prices_need_rotation:
_LOGGER.info("Performing midnight turnover: today→yesterday, tomorrow→today")
return {
"yesterday": today_prices,
"today": tomorrow_prices,
"tomorrow": [],
}
return price_info
async def _store_cache(self) -> None:
"""Store cache data."""
data = {
@ -345,6 +421,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
first_home_data = next(iter(homes_data.values()))
price_info = first_home_data.get("price_info", {})
# Perform midnight turnover if needed (handles day transitions)
price_info = self._perform_midnight_turnover(price_info)
# Get threshold percentages for enrichment
thresholds = self._get_threshold_percentages()
@ -378,6 +457,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
price_info = home_data.get("price_info", {})
# Perform midnight turnover if needed (handles day transitions)
price_info = self._perform_midnight_turnover(price_info)
# Get threshold percentages for enrichment
thresholds = self._get_threshold_percentages()

View file

@ -373,11 +373,32 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
return None
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
if not today_prices:
return None
prices = [float(price["total"]) for price in today_prices]
# Get local midnight boundaries
local_midnight = dt_util.as_local(dt_util.start_of_local_day(dt_util.now()))
local_midnight_tomorrow = local_midnight + timedelta(days=1)
# Collect all prices from both today and tomorrow data that fall within local today
prices = []
for day_key in ["today", "tomorrow"]:
for price_data in price_info.get(day_key, []):
starts_at_str = price_data.get("startsAt")
if not starts_at_str:
continue
starts_at = dt_util.parse_datetime(starts_at_str)
if starts_at is None:
continue
# Convert to local timezone for comparison
starts_at = dt_util.as_local(starts_at)
# Include price if it starts within today's local date boundaries
if local_midnight <= starts_at < local_midnight_tomorrow:
total_price = price_data.get("total")
if total_price is not None:
prices.append(float(total_price))
if not prices:
return None

View file

@ -0,0 +1,178 @@
"""Test midnight turnover logic - focused unit tests."""
from __future__ import annotations
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
def generate_price_intervals(
start_date: datetime,
num_intervals: int = 96,
base_price: float = 0.20,
) -> list[dict]:
"""Generate realistic price intervals for a day."""
intervals = []
current_time = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
for i in range(num_intervals):
intervals.append(
{
"startsAt": current_time.isoformat(),
"total": base_price + (i * 0.001),
"level": "NORMAL",
}
)
current_time += timedelta(minutes=15)
return intervals
def test_midnight_turnover_with_stale_today_data() -> None:
"""Test midnight turnover when today's data is from the previous day."""
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator._perform_midnight_turnover = TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__(
coordinator, TibberPricesDataUpdateCoordinator
)
today_local = datetime(2025, 11, 2, 14, 30)
yesterday_prices = generate_price_intervals(
datetime(2025, 11, 1, 0, 0),
num_intervals=96,
)
tomorrow_prices = generate_price_intervals(
datetime(2025, 11, 3, 0, 0),
num_intervals=96,
)
price_info = {
"yesterday": [],
"today": yesterday_prices,
"tomorrow": tomorrow_prices,
}
with patch("custom_components.tibber_prices.coordinator.dt_util") as mock_dt_util:
mock_dt_util.as_local.side_effect = lambda dt: dt if dt else datetime(2025, 11, 2)
mock_dt_util.now.return_value = today_local
mock_dt_util.parse_datetime.side_effect = lambda s: (datetime.fromisoformat(s) if s else None)
rotated = coordinator._perform_midnight_turnover(price_info)
assert len(rotated["yesterday"]) == 96
assert rotated["yesterday"][0]["startsAt"].startswith("2025-11-01")
assert len(rotated["today"]) == 96
assert rotated["today"][0]["startsAt"].startswith("2025-11-03")
assert len(rotated["tomorrow"]) == 0
def test_midnight_turnover_no_rotation_needed() -> None:
"""Test that turnover skips rotation when data is already current."""
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator._perform_midnight_turnover = TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__(
coordinator, TibberPricesDataUpdateCoordinator
)
today_local = datetime(2025, 11, 2, 14, 30)
today_prices = generate_price_intervals(
datetime(2025, 11, 2, 0, 0),
num_intervals=96,
)
tomorrow_prices = generate_price_intervals(
datetime(2025, 11, 3, 0, 0),
num_intervals=96,
)
price_info = {
"yesterday": [],
"today": today_prices,
"tomorrow": tomorrow_prices,
}
with patch("custom_components.tibber_prices.coordinator.dt_util") as mock_dt_util:
mock_dt_util.as_local.side_effect = lambda dt: dt if dt else datetime(2025, 11, 2)
mock_dt_util.now.return_value = today_local
mock_dt_util.parse_datetime.side_effect = lambda s: (datetime.fromisoformat(s) if s else None)
rotated = coordinator._perform_midnight_turnover(price_info)
assert rotated == price_info
assert rotated["today"][0]["startsAt"].startswith("2025-11-02")
def test_scenario_missed_midnight_recovery() -> None:
"""Scenario: Server was down at midnight, comes back online later."""
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator._perform_midnight_turnover = TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__(
coordinator, TibberPricesDataUpdateCoordinator
)
yesterday_prices = generate_price_intervals(datetime(2025, 11, 1, 0, 0))
tomorrow_prices = generate_price_intervals(datetime(2025, 11, 2, 0, 0))
price_info = {
"yesterday": [],
"today": yesterday_prices,
"tomorrow": tomorrow_prices,
}
with patch("custom_components.tibber_prices.coordinator.dt_util") as mock_dt_util:
current_local = datetime(2025, 11, 2, 14, 0)
mock_dt_util.as_local.side_effect = lambda dt: (dt if isinstance(dt, datetime) else current_local)
mock_dt_util.now.return_value = current_local
mock_dt_util.parse_datetime.side_effect = lambda s: (datetime.fromisoformat(s) if s else None)
rotated = coordinator._perform_midnight_turnover(price_info)
assert len(rotated["yesterday"]) == 96
assert rotated["yesterday"][0]["startsAt"].startswith("2025-11-01")
assert len(rotated["today"]) == 96
assert rotated["today"][0]["startsAt"].startswith("2025-11-02")
assert len(rotated["tomorrow"]) == 0
def test_scenario_normal_daily_refresh() -> None:
"""Scenario: Normal daily refresh at 5 AM (all data is current)."""
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator._perform_midnight_turnover = TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__(
coordinator, TibberPricesDataUpdateCoordinator
)
today_prices = generate_price_intervals(datetime(2025, 11, 2, 0, 0))
tomorrow_prices = generate_price_intervals(datetime(2025, 11, 3, 0, 0))
price_info = {
"yesterday": [],
"today": today_prices,
"tomorrow": tomorrow_prices,
}
with patch("custom_components.tibber_prices.coordinator.dt_util") as mock_dt_util:
current_local = datetime(2025, 11, 2, 5, 0)
mock_dt_util.as_local.side_effect = lambda dt: (dt if isinstance(dt, datetime) else current_local)
mock_dt_util.now.return_value = current_local
mock_dt_util.parse_datetime.side_effect = lambda s: (datetime.fromisoformat(s) if s else None)
rotated = coordinator._perform_midnight_turnover(price_info)
assert len(rotated["today"]) == 96
assert rotated["today"][0]["startsAt"].startswith("2025-11-02")
assert len(rotated["tomorrow"]) == 96
assert rotated["tomorrow"][0]["startsAt"].startswith("2025-11-03")