fix(interval_pool): prevent external mutation of cached intervals

Return shallow copies from _get_cached_intervals() to prevent external
code (e.g., parse_all_timestamps()) from mutating Pool internal cache.
This fixes TypeError in check_coverage() caused by datetime objects in
cached interval dicts.

Additional improvements:
- Add TimeService support for time-travel testing in cache/manager
- Normalize startsAt to consistent format (handles datetime vs string)
- Rename detect_gaps() → check_coverage() for clarity
- Add get_sensor_data() for sensor data fetching with fetch/return separation
- Add get_pool_stats() for lifecycle sensor metrics

Impact: Fixes critical cache mutation bug, enables time-travel testing,
improves pool API for sensor integration.
This commit is contained in:
Julian Pawlowski 2025-12-23 14:13:24 +00:00
parent 94615dc6cd
commit 7adc56bf79
4 changed files with 291 additions and 35 deletions

View file

@ -4,10 +4,15 @@ from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from typing import TYPE_CHECKING, Any
from homeassistant.util import dt as dt_utils
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import (
TibberPricesTimeService,
)
_LOGGER = logging.getLogger(__name__)
_LOGGER_DETAILS = logging.getLogger(__name__ + ".details")
@ -37,9 +42,10 @@ class TibberPricesIntervalPoolFetchGroupCache:
Protected: 2025-11-23 00:00 to 2025-11-27 00:00
"""
def __init__(self) -> None:
"""Initialize empty fetch group cache."""
def __init__(self, *, time_service: TibberPricesTimeService | None = None) -> None:
"""Initialize empty fetch group cache with optional TimeService."""
self._fetch_groups: list[dict[str, Any]] = []
self._time_service = time_service
# Protected range cache (invalidated daily)
self._protected_range_cache: tuple[str, str] | None = None
@ -93,6 +99,11 @@ class TibberPricesIntervalPoolFetchGroupCache:
Protected range: day-before-yesterday 00:00 to day-after-tomorrow 00:00.
This range shifts daily automatically.
Time Machine Support:
If time_service was provided at init, uses time_service.now() for
"today" calculation. This protects the correct date range when
simulating a different date.
Returns:
Tuple of (start_iso, end_iso) for protected range.
Start is inclusive, end is exclusive.
@ -102,10 +113,11 @@ class TibberPricesIntervalPoolFetchGroupCache:
Protected days: 2025-11-23, 2025-11-24, 2025-11-25, 2025-11-26
"""
# Check cache validity (invalidate daily)
now = dt_utils.now()
# Use TimeService if available (Time Machine support), else real time
now = self._time_service.now() if self._time_service else dt_utils.now()
today_date_str = now.date().isoformat()
# Check cache validity (invalidate daily)
if self._protected_range_cache_date == today_date_str and self._protected_range_cache:
return self._protected_range_cache

View file

@ -1,4 +1,4 @@
"""Interval fetcher - gap detection and API coordination for interval pool."""
"""Interval fetcher - coverage check and API coordination for interval pool."""
from __future__ import annotations
@ -38,7 +38,7 @@ TIME_TOLERANCE_MINUTES = 1
class TibberPricesIntervalPoolFetcher:
"""Fetch missing intervals from API based on gap detection."""
"""Fetch missing intervals from API based on coverage check."""
def __init__(
self,
@ -62,14 +62,14 @@ class TibberPricesIntervalPoolFetcher:
self._index = index
self._home_id = home_id
def detect_gaps(
def check_coverage(
self,
cached_intervals: list[dict[str, Any]],
start_time_iso: str,
end_time_iso: str,
) -> list[tuple[str, str]]:
"""
Detect missing time ranges that need to be fetched.
Check cache coverage and find missing time ranges.
This method minimizes API calls by:
1. Finding all gaps in cached intervals
@ -130,7 +130,7 @@ class TibberPricesIntervalPoolFetcher:
if time_diff_before_first > TIME_TOLERANCE_SECONDS:
missing_ranges.append((start_time_iso, sorted_intervals[0]["startsAt"]))
_LOGGER_DETAILS.debug(
"Gap before first cached interval: %s to %s (%.1f seconds)",
"Missing range before first cached interval: %s to %s (%.1f seconds)",
start_time_iso,
sorted_intervals[0]["startsAt"],
time_diff_before_first,
@ -163,7 +163,7 @@ class TibberPricesIntervalPoolFetcher:
current_interval_end = current_dt + timedelta(minutes=expected_interval_minutes)
missing_ranges.append((current_interval_end.isoformat(), next_start))
_LOGGER_DETAILS.debug(
"Gap between cached intervals: %s (ends at %s) to %s (%.1f min gap, expected %d min)",
"Missing range between cached intervals: %s (ends at %s) to %s (%.1f min, expected %d min)",
current_start,
current_interval_end.isoformat(),
next_start,
@ -190,7 +190,7 @@ class TibberPricesIntervalPoolFetcher:
# Missing range starts AFTER the last cached interval ends
missing_ranges.append((last_interval_end_dt.isoformat(), end_time_iso))
_LOGGER_DETAILS.debug(
"Gap after last cached interval: %s (ends at %s) to %s (%.1f seconds, need >= %d)",
"Missing range after last cached interval: %s (ends at %s) to %s (%.1f seconds, need >= %d)",
sorted_intervals[-1]["startsAt"],
last_interval_end_dt.isoformat(),
end_time_iso,
@ -200,7 +200,7 @@ class TibberPricesIntervalPoolFetcher:
if not missing_ranges:
_LOGGER.debug(
"No gaps detected - all intervals cached for range %s to %s",
"Full coverage - all intervals cached for range %s to %s",
start_time_iso,
end_time_iso,
)
@ -285,7 +285,7 @@ class TibberPricesIntervalPoolFetcher:
for idx, (missing_start_iso, missing_end_iso) in enumerate(missing_ranges, start=1):
_LOGGER_DETAILS.debug(
"API call %d/%d for home %s: fetching range %s to %s",
"Fetching from Tibber API (%d/%d) for home %s: range %s to %s",
idx,
len(missing_ranges),
self._home_id,
@ -309,10 +309,9 @@ class TibberPricesIntervalPoolFetcher:
all_fetched_intervals.append(fetched_intervals)
_LOGGER_DETAILS.debug(
"Fetched %d intervals from API for home %s (fetch time: %s)",
"Received %d intervals from Tibber API for home %s",
len(fetched_intervals),
self._home_id,
fetch_time_iso,
)
# Notify callback if provided (for immediate caching)

View file

@ -3,6 +3,7 @@
from __future__ import annotations
import logging
from datetime import datetime
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
@ -17,6 +18,13 @@ _LOGGER_DETAILS = logging.getLogger(__name__ + ".details")
MAX_CACHE_SIZE = 960
def _normalize_starts_at(starts_at: datetime | str) -> str:
"""Normalize startsAt to consistent format (YYYY-MM-DDTHH:MM:SS)."""
if isinstance(starts_at, datetime):
return starts_at.strftime("%Y-%m-%dT%H:%M:%S")
return starts_at[:19]
class TibberPricesIntervalPoolGarbageCollector:
"""
Manages cache eviction and dead interval cleanup.
@ -173,7 +181,7 @@ class TibberPricesIntervalPoolGarbageCollector:
living_intervals = []
for interval_idx, interval in enumerate(old_intervals):
starts_at_normalized = interval["startsAt"][:19]
starts_at_normalized = _normalize_starts_at(interval["startsAt"])
index_entry = self._index.get(starts_at_normalized)
if index_entry is not None:

View file

@ -7,18 +7,22 @@ import contextlib
import logging
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
from zoneinfo import ZoneInfo
from custom_components.tibber_prices.api.exceptions import TibberPricesApiClientError
from homeassistant.util import dt as dt_utils
from .cache import TibberPricesIntervalPoolFetchGroupCache
from .fetcher import TibberPricesIntervalPoolFetcher
from .garbage_collector import TibberPricesIntervalPoolGarbageCollector
from .garbage_collector import MAX_CACHE_SIZE, TibberPricesIntervalPoolGarbageCollector
from .index import TibberPricesIntervalPoolTimestampIndex
from .storage import async_save_pool_state
if TYPE_CHECKING:
from custom_components.tibber_prices.api.client import TibberPricesApiClient
from custom_components.tibber_prices.coordinator.time_service import (
TibberPricesTimeService,
)
_LOGGER = logging.getLogger(__name__)
_LOGGER_DETAILS = logging.getLogger(__name__ + ".details")
@ -31,6 +35,13 @@ INTERVAL_QUARTER_HOURLY = 15
DEBOUNCE_DELAY_SECONDS = 3.0
def _normalize_starts_at(starts_at: datetime | str) -> str:
"""Normalize startsAt to consistent format (YYYY-MM-DDTHH:MM:SS)."""
if isinstance(starts_at, datetime):
return starts_at.strftime("%Y-%m-%dT%H:%M:%S")
return starts_at[:19]
class TibberPricesIntervalPool:
"""
High-performance interval cache manager for a single Tibber home.
@ -71,6 +82,7 @@ class TibberPricesIntervalPool:
api: TibberPricesApiClient,
hass: Any | None = None,
entry_id: str | None = None,
time_service: TibberPricesTimeService | None = None,
) -> None:
"""
Initialize interval pool manager.
@ -80,12 +92,15 @@ class TibberPricesIntervalPool:
api: API client for fetching intervals.
hass: HomeAssistant instance for auto-save (optional).
entry_id: Config entry ID for auto-save (optional).
time_service: TimeService for time-travel support (optional).
If None, uses real time (dt_utils.now()).
"""
self._home_id = home_id
self._time_service = time_service
# Initialize components with dependency injection
self._cache = TibberPricesIntervalPoolFetchGroupCache()
self._cache = TibberPricesIntervalPoolFetchGroupCache(time_service=time_service)
self._index = TibberPricesIntervalPoolTimestampIndex()
self._gc = TibberPricesIntervalPoolGarbageCollector(self._cache, self._index, home_id)
self._fetcher = TibberPricesIntervalPoolFetcher(api, self._cache, self._index, home_id)
@ -154,19 +169,18 @@ class TibberPricesIntervalPool:
# Get cached intervals using index
cached_intervals = self._get_cached_intervals(start_time_iso, end_time_iso)
# Detect missing ranges
missing_ranges = self._fetcher.detect_gaps(cached_intervals, start_time_iso, end_time_iso)
# Check coverage - find ranges not in cache
missing_ranges = self._fetcher.check_coverage(cached_intervals, start_time_iso, end_time_iso)
if missing_ranges:
_LOGGER_DETAILS.debug(
"Detected %d missing range(s) for home %s - will make %d API call(s)",
len(missing_ranges),
"Coverage check for home %s: %d range(s) missing - will fetch from API",
self._home_id,
len(missing_ranges),
)
else:
_LOGGER_DETAILS.debug(
"All intervals available in cache for home %s - zero API calls needed",
"Coverage check for home %s: full coverage in cache - no API calls needed",
self._home_id,
)
@ -187,17 +201,232 @@ class TibberPricesIntervalPool:
final_result = self._get_cached_intervals(start_time_iso, end_time_iso)
_LOGGER_DETAILS.debug(
"Interval pool returning %d intervals for home %s "
"(initially %d cached, %d API calls made, final %d after re-reading cache)",
"Pool returning %d intervals for home %s (from cache: %d, fetched from API: %d ranges)",
len(final_result),
self._home_id,
len(cached_intervals),
len(missing_ranges),
len(final_result),
)
return final_result
async def get_sensor_data(
self,
api_client: TibberPricesApiClient,
user_data: dict[str, Any],
home_timezone: str | None = None,
*,
include_tomorrow: bool = True,
) -> list[dict[str, Any]]:
"""
Get price intervals for sensor data (day-before-yesterday to end-of-tomorrow).
Convenience method for coordinator/sensors that need the standard 4-day window:
- Day before yesterday (for trailing 24h averages at midnight)
- Yesterday (for trailing 24h averages)
- Today (current prices)
- Tomorrow (if available in cache)
IMPORTANT - Two distinct behaviors:
1. API FETCH: Controlled by include_tomorrow flag
- include_tomorrow=False Only fetch up to end of today (prevents API spam before 13:00)
- include_tomorrow=True Fetch including tomorrow data
2. RETURN DATA: Always returns full protected range (including tomorrow if cached)
- This ensures cached tomorrow data is used even if include_tomorrow=False
The separation prevents the following bug:
- If include_tomorrow affected both fetch AND return, cached tomorrow data
would be lost when include_tomorrow=False, causing infinite refresh loops.
Args:
api_client: TibberPricesApiClient instance for API calls.
user_data: User data dict containing home metadata.
home_timezone: Optional timezone string (e.g., "Europe/Berlin").
include_tomorrow: If True, fetch tomorrow's data from API. If False,
only fetch up to end of today. Default True.
DOES NOT affect returned data - always returns full range.
Returns:
List of price interval dicts for the 4-day window (including any cached
tomorrow data), sorted by startsAt.
"""
# Determine timezone
tz_str = home_timezone
if not tz_str:
tz_str = self._extract_timezone_from_user_data(user_data)
# Calculate range in home's timezone
tz = ZoneInfo(tz_str) if tz_str else None
now = self._time_service.now() if self._time_service else dt_utils.now()
now_local = now.astimezone(tz) if tz else now
# Day before yesterday 00:00 (start) - same for both fetch and return
day_before_yesterday = (now_local - timedelta(days=2)).replace(hour=0, minute=0, second=0, microsecond=0)
# End of tomorrow (full protected range) - used for RETURN data
end_of_tomorrow = (now_local + timedelta(days=2)).replace(hour=0, minute=0, second=0, microsecond=0)
# API fetch range depends on include_tomorrow flag
if include_tomorrow:
fetch_end_time = end_of_tomorrow
fetch_desc = "end-of-tomorrow"
else:
# Only fetch up to end of today (prevents API spam before 13:00)
fetch_end_time = (now_local + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
fetch_desc = "end-of-today"
_LOGGER.debug(
"Sensor data request for home %s: fetch %s to %s (%s), return up to %s",
self._home_id,
day_before_yesterday.isoformat(),
fetch_end_time.isoformat(),
fetch_desc,
end_of_tomorrow.isoformat(),
)
# Fetch data (may be partial if include_tomorrow=False)
await self.get_intervals(
api_client=api_client,
user_data=user_data,
start_time=day_before_yesterday,
end_time=fetch_end_time,
)
# Return FULL protected range (including any cached tomorrow data)
# This ensures cached tomorrow data is available even when include_tomorrow=False
return self._get_cached_intervals(
day_before_yesterday.isoformat(),
end_of_tomorrow.isoformat(),
)
def get_pool_stats(self) -> dict[str, Any]:
"""
Get statistics about the interval pool.
Returns comprehensive statistics for diagnostic sensors, separated into:
- Sensor intervals (protected range: day-before-yesterday to tomorrow)
- Cache statistics (entire pool including service-requested data)
Protected Range:
The protected range covers 4 days at 15-min resolution = 384 intervals.
These intervals are never evicted by garbage collection.
Cache Fill Level:
Shows how full the cache is relative to MAX_CACHE_SIZE (960).
100% is not bad - just means we're using the available space.
GC will evict oldest non-protected intervals when limit is reached.
Returns:
Dict with sensor intervals, cache stats, and timestamps.
"""
fetch_groups = self._cache.get_fetch_groups()
# === Sensor Intervals (Protected Range) ===
sensor_stats = self._get_sensor_interval_stats()
# === Cache Statistics (Entire Pool) ===
cache_total = self._index.count()
cache_limit = MAX_CACHE_SIZE
cache_fill_percent = round((cache_total / cache_limit) * 100, 1) if cache_limit > 0 else 0
cache_extra = max(0, cache_total - sensor_stats["count"]) # Intervals outside protected range
# === Timestamps ===
# Last sensor fetch (for protected range data)
last_sensor_fetch: str | None = None
oldest_interval: str | None = None
newest_interval: str | None = None
if fetch_groups:
# Find newest fetch group (most recent API call)
newest_group = max(fetch_groups, key=lambda g: g["fetched_at"])
last_sensor_fetch = newest_group["fetched_at"].isoformat()
# Find oldest and newest intervals across all fetch groups
all_timestamps = list(self._index.get_raw_index().keys())
if all_timestamps:
oldest_interval = min(all_timestamps)
newest_interval = max(all_timestamps)
return {
# Sensor intervals (protected range)
"sensor_intervals_count": sensor_stats["count"],
"sensor_intervals_expected": sensor_stats["expected"],
"sensor_intervals_has_gaps": sensor_stats["has_gaps"],
# Cache statistics
"cache_intervals_total": cache_total,
"cache_intervals_limit": cache_limit,
"cache_fill_percent": cache_fill_percent,
"cache_intervals_extra": cache_extra,
# Timestamps
"last_sensor_fetch": last_sensor_fetch,
"cache_oldest_interval": oldest_interval,
"cache_newest_interval": newest_interval,
# Fetch groups (API calls)
"fetch_groups_count": len(fetch_groups),
}
def _get_sensor_interval_stats(self) -> dict[str, Any]:
"""
Get statistics for sensor intervals (protected range).
Protected range: day-before-yesterday 00:00 to day-after-tomorrow 00:00.
Expected: 4 days * 24 hours * 4 intervals = 384 intervals.
Returns:
Dict with count, expected, and has_gaps.
"""
start_iso, end_iso = self._cache.get_protected_range()
start_dt = datetime.fromisoformat(start_iso)
end_dt = datetime.fromisoformat(end_iso)
# Count expected intervals (15-min resolution)
expected_count = int((end_dt - start_dt).total_seconds() / (15 * 60))
# Count actual intervals in range
actual_count = 0
current_dt = start_dt
while current_dt < end_dt:
current_key = current_dt.isoformat()[:19]
if self._index.contains(current_key):
actual_count += 1
current_dt += timedelta(minutes=15)
return {
"count": actual_count,
"expected": expected_count,
"has_gaps": actual_count < expected_count,
}
def _has_gaps_in_protected_range(self) -> bool:
"""
Check if there are gaps in the protected date range.
Delegates to _get_sensor_interval_stats() for consistency.
Returns:
True if any gaps exist, False if protected range is complete.
"""
return self._get_sensor_interval_stats()["has_gaps"]
def _extract_timezone_from_user_data(self, user_data: dict[str, Any]) -> str | None:
"""Extract timezone for this home from user_data."""
if not user_data:
return None
viewer = user_data.get("viewer", {})
homes = viewer.get("homes", [])
for home in homes:
if home.get("id") == self._home_id:
return home.get("timeZone")
return None
def _get_cached_intervals(
self,
start_time_iso: str,
@ -208,13 +437,17 @@ class TibberPricesIntervalPool:
Uses timestamp_index for O(1) lookups per timestamp.
IMPORTANT: Returns shallow copies of interval dicts to prevent external
mutations (e.g., by parse_all_timestamps()) from affecting cached data.
The Pool cache must remain immutable to ensure consistent behavior.
Args:
start_time_iso: ISO timestamp string (inclusive).
end_time_iso: ISO timestamp string (exclusive).
Returns:
List of cached interval dicts in time range (may be empty or incomplete).
Sorted by startsAt timestamp.
Sorted by startsAt timestamp. Each dict is a shallow copy.
"""
# Parse query range once
@ -239,7 +472,9 @@ class TibberPricesIntervalPool:
fetch_groups = self._cache.get_fetch_groups()
fetch_group = fetch_groups[location["fetch_group_index"]]
interval = fetch_group["intervals"][location["interval_index"]]
result.append(interval)
# CRITICAL: Return shallow copy to prevent external mutations
# (e.g., parse_all_timestamps() converts startsAt to datetime in-place)
result.append(dict(interval))
# Move to next expected interval
current_dt += timedelta(minutes=interval_minutes)
@ -249,9 +484,9 @@ class TibberPricesIntervalPool:
interval_minutes = INTERVAL_QUARTER_HOURLY
_LOGGER_DETAILS.debug(
"Cache lookup for home %s: found %d intervals in range %s to %s",
self._home_id,
"Retrieved %d intervals from cache for home %s (range %s to %s)",
len(result),
self._home_id,
start_time_iso,
end_time_iso,
)
@ -289,7 +524,7 @@ class TibberPricesIntervalPool:
intervals_to_touch = []
for interval in intervals:
starts_at_normalized = interval["startsAt"][:19]
starts_at_normalized = _normalize_starts_at(interval["startsAt"])
if not self._index.contains(starts_at_normalized):
new_intervals.append(interval)
else:
@ -321,7 +556,7 @@ class TibberPricesIntervalPool:
# Update timestamp index for all new intervals
for interval_index, interval in enumerate(new_intervals):
starts_at_normalized = interval["startsAt"][:19]
starts_at_normalized = _normalize_starts_at(interval["startsAt"])
self._index.add(interval, fetch_group_index, interval_index)
_LOGGER_DETAILS.debug(
@ -482,7 +717,7 @@ class TibberPricesIntervalPool:
living_intervals = []
for interval_idx, interval in enumerate(fetch_group["intervals"]):
starts_at_normalized = interval["startsAt"][:19]
starts_at_normalized = _normalize_starts_at(interval["startsAt"])
# Check if interval is still referenced in index
location = self._index.get(starts_at_normalized)
@ -517,6 +752,7 @@ class TibberPricesIntervalPool:
api: TibberPricesApiClient,
hass: Any | None = None,
entry_id: str | None = None,
time_service: TibberPricesTimeService | None = None,
) -> TibberPricesIntervalPool | None:
"""
Restore interval pool manager from storage.
@ -529,6 +765,7 @@ class TibberPricesIntervalPool:
api: API client for fetching intervals.
hass: HomeAssistant instance for auto-save (optional).
entry_id: Config entry ID for auto-save (optional).
time_service: TimeService for time-travel support (optional).
Returns:
Restored TibberPricesIntervalPool instance, or None if format unknown/corrupted.
@ -548,7 +785,7 @@ class TibberPricesIntervalPool:
home_id = data["home_id"]
# Create manager with home_id from storage
manager = cls(home_id=home_id, api=api, hass=hass, entry_id=entry_id)
manager = cls(home_id=home_id, api=api, hass=hass, entry_id=entry_id, time_service=time_service)
# Restore fetch groups to cache
for serialized_group in data.get("fetch_groups", []):