diff --git a/custom_components/tibber_prices/__init__.py b/custom_components/tibber_prices/__init__.py index d9dd3d8..4f25465 100644 --- a/custom_components/tibber_prices/__init__.py +++ b/custom_components/tibber_prices/__init__.py @@ -28,6 +28,12 @@ from .const import ( ) from .coordinator import STORAGE_VERSION, TibberPricesDataUpdateCoordinator from .data import TibberPricesData +from .interval_pool import ( + TibberPricesIntervalPool, + async_load_pool_state, + async_remove_pool_storage, + async_save_pool_state, +) from .services import async_setup_services if TYPE_CHECKING: @@ -165,10 +171,49 @@ async def async_setup_entry( version=str(integration.version) if integration.version else "unknown", ) + # Get home_id from config entry (required for single-home pool architecture) + home_id = entry.data.get("home_id") + if not home_id: + msg = f"[{entry.title}] Config entry missing home_id (required for interval pool)" + raise ConfigEntryAuthFailed(msg) + + # Create or load interval pool for this config entry (single-home architecture) + pool_state = await async_load_pool_state(hass, entry.entry_id) + if pool_state: + interval_pool = TibberPricesIntervalPool.from_dict( + pool_state, + api=api_client, + hass=hass, + entry_id=entry.entry_id, + ) + if interval_pool is None: + # Old multi-home format or corrupted → create new pool + LOGGER.info( + "[%s] Interval pool storage invalid/corrupted, creating new pool (will rebuild from API)", + entry.title, + ) + interval_pool = TibberPricesIntervalPool( + home_id=home_id, + api=api_client, + hass=hass, + entry_id=entry.entry_id, + ) + else: + LOGGER.debug("[%s] Interval pool restored from storage (auto-save enabled)", entry.title) + else: + interval_pool = TibberPricesIntervalPool( + home_id=home_id, + api=api_client, + hass=hass, + entry_id=entry.entry_id, + ) + LOGGER.debug("[%s] Created new interval pool (auto-save enabled)", entry.title) + coordinator = TibberPricesDataUpdateCoordinator( hass=hass, config_entry=entry, api_client=api_client, + interval_pool=interval_pool, ) # CRITICAL: Load cache BEFORE first refresh to ensure user_data is available @@ -180,6 +225,7 @@ async def async_setup_entry( client=api_client, integration=integration, coordinator=coordinator, + interval_pool=interval_pool, ) # https://developers.home-assistant.io/docs/integration_fetching_data#coordinated-single-api-poll-for-data-for-all-entities @@ -199,6 +245,12 @@ async def async_unload_entry( entry: TibberPricesConfigEntry, ) -> bool: """Unload a config entry.""" + # Save interval pool state before unloading + if entry.runtime_data is not None and entry.runtime_data.interval_pool is not None: + pool_state = entry.runtime_data.interval_pool.to_dict() + await async_save_pool_state(hass, entry.entry_id, pool_state) + LOGGER.debug("[%s] Interval pool state saved on unload", entry.title) + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) if unload_ok and entry.runtime_data is not None: @@ -222,10 +274,15 @@ async def async_remove_entry( entry: TibberPricesConfigEntry, ) -> None: """Handle removal of an entry.""" + # Remove coordinator cache storage if storage := Store(hass, STORAGE_VERSION, f"{DOMAIN}.{entry.entry_id}"): LOGGER.debug(f"[tibber_prices] async_remove_entry removing cache store for entry_id={entry.entry_id}") await storage.async_remove() + # Remove interval pool storage + await async_remove_pool_storage(hass, entry.entry_id) + LOGGER.debug(f"[tibber_prices] async_remove_entry removed interval pool storage for entry_id={entry.entry_id}") + async def async_reload_entry( hass: HomeAssistant, diff --git a/custom_components/tibber_prices/api/client.py b/custom_components/tibber_prices/api/client.py index 0d27f31..fca6689 100644 --- a/custom_components/tibber_prices/api/client.py +++ b/custom_components/tibber_prices/api/client.py @@ -7,7 +7,7 @@ import base64 import logging import re import socket -from datetime import timedelta +from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any from zoneinfo import ZoneInfo @@ -33,6 +33,7 @@ if TYPE_CHECKING: from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService _LOGGER = logging.getLogger(__name__) +_LOGGER_API_DETAILS = logging.getLogger(__name__ + ".details") class TibberPricesApiClient: @@ -136,6 +137,49 @@ class TibberPricesApiClient: query_type=TibberPricesQueryType.USER, ) + async def async_get_price_info_for_range( + self, + home_id: str, + user_data: dict[str, Any], + start_time: datetime, + end_time: datetime, + ) -> dict: + """ + Get price info for a specific time range with automatic routing. + + This is a convenience wrapper around interval_pool.get_price_intervals_for_range(). + + Args: + home_id: Home ID to fetch price data for. + user_data: User data dict containing home metadata (including timezone). + start_time: Start of the range (inclusive, timezone-aware). + end_time: End of the range (exclusive, timezone-aware). + + Returns: + Dict with "home_id" and "price_info" (list of intervals). + + Raises: + TibberPricesApiClientError: If arguments invalid or requests fail. + + """ + # Import here to avoid circular dependency (interval_pool imports TibberPricesApiClient) + from custom_components.tibber_prices.interval_pool import ( # noqa: PLC0415 + get_price_intervals_for_range, + ) + + price_info = await get_price_intervals_for_range( + api_client=self, + home_id=home_id, + user_data=user_data, + start_time=start_time, + end_time=end_time, + ) + + return { + "home_id": home_id, + "price_info": price_info, + } + async def async_get_price_info(self, home_id: str, user_data: dict[str, Any]) -> dict: """ Get price info for a single home. @@ -228,6 +272,284 @@ class TibberPricesApiClient: "price_info": price_info, } + async def async_get_price_info_range( + self, + home_id: str, + user_data: dict[str, Any], + start_time: datetime, + end_time: datetime, + ) -> dict: + """ + Get historical price info for a specific time range using priceInfoRange endpoint. + + Uses the priceInfoRange GraphQL endpoint for flexible historical data queries. + Intended for intervals BEFORE "day before yesterday midnight" (outside PRICE_INFO scope). + + Automatically handles API pagination if Tibber limits batch size. + + Args: + home_id: Home ID to fetch price data for. + user_data: User data dict containing home metadata (including timezone). + start_time: Start of the range (inclusive, timezone-aware). + end_time: End of the range (exclusive, timezone-aware). + + Returns: + Dict with "home_id" and "price_info" (list of intervals). + + Raises: + TibberPricesApiClientError: If arguments invalid or request fails. + + """ + if not user_data: + msg = "User data required for timezone-aware price fetching - fetch user data first" + raise TibberPricesApiClientError(msg) + + if not home_id: + msg = "Home ID is required" + raise TibberPricesApiClientError(msg) + + if start_time >= end_time: + msg = f"Invalid time range: start_time ({start_time}) must be before end_time ({end_time})" + raise TibberPricesApiClientError(msg) + + _LOGGER_API_DETAILS.debug( + "fetch_price_info_range called with: start_time=%s (type=%s, tzinfo=%s), end_time=%s (type=%s, tzinfo=%s)", + start_time, + type(start_time), + start_time.tzinfo, + end_time, + type(end_time), + end_time.tzinfo, + ) + + # Calculate cursor and interval count + start_cursor = self._encode_cursor(start_time) + interval_count = self._calculate_interval_count(start_time, end_time) + + _LOGGER_API_DETAILS.debug( + "Calculated cursor for range: start_time=%s, cursor_time=%s, encoded=%s", + start_time, + start_time, + start_cursor, + ) + + # Fetch all intervals with automatic paging + price_info = await self._fetch_price_info_with_paging( + home_id=home_id, + start_cursor=start_cursor, + interval_count=interval_count, + ) + + return { + "home_id": home_id, + "price_info": price_info, + } + + def _calculate_interval_count(self, start_time: datetime, end_time: datetime) -> int: + """Calculate number of intervals needed based on date range.""" + time_diff = end_time - start_time + resolution_change_date = datetime(2025, 10, 1, tzinfo=start_time.tzinfo) + + if start_time < resolution_change_date: + # Pre-resolution-change: hourly intervals only + interval_count = int(time_diff.total_seconds() / 3600) # 3600s = 1h + _LOGGER_API_DETAILS.debug( + "Time range is pre-2025-10-01: expecting hourly intervals (count: %d)", + interval_count, + ) + else: + # Post-resolution-change: quarter-hourly intervals + interval_count = int(time_diff.total_seconds() / 900) # 900s = 15min + _LOGGER_API_DETAILS.debug( + "Time range is post-2025-10-01: expecting quarter-hourly intervals (count: %d)", + interval_count, + ) + + return interval_count + + async def _fetch_price_info_with_paging( + self, + home_id: str, + start_cursor: str, + interval_count: int, + ) -> list[dict[str, Any]]: + """ + Fetch price info with automatic pagination if API limits batch size. + + GraphQL Cursor Pagination: + - endCursor points to the last returned element (inclusive) + - Use 'after: endCursor' to get elements AFTER that cursor + - If count < requested, more pages available + + Args: + home_id: Home ID to fetch price data for. + start_cursor: Base64-encoded start cursor. + interval_count: Total number of intervals to fetch. + + Returns: + List of all price interval dicts across all pages. + + """ + price_info = [] + remaining_intervals = interval_count + cursor = start_cursor + page = 0 + + while remaining_intervals > 0: + page += 1 + + # Fetch one page + page_data = await self._fetch_single_page( + home_id=home_id, + cursor=cursor, + requested_count=remaining_intervals, + page=page, + ) + + if not page_data: + break + + # Extract intervals and pagination info + page_intervals = page_data["intervals"] + returned_count = page_data["count"] + end_cursor = page_data["end_cursor"] + has_next_page = page_data.get("has_next_page", False) + + price_info.extend(page_intervals) + + _LOGGER_API_DETAILS.debug( + "Page %d: Received %d intervals for home %s (total so far: %d/%d, endCursor=%s, hasNextPage=%s)", + page, + returned_count, + home_id, + len(price_info), + interval_count, + end_cursor, + has_next_page, + ) + + # Update remaining count + remaining_intervals -= returned_count + + # Check if we need more pages + # Continue if: (1) we still need more intervals AND (2) API has more data + if remaining_intervals > 0 and end_cursor: + cursor = end_cursor + _LOGGER_API_DETAILS.debug( + "Still need %d more intervals - fetching next page with cursor %s", + remaining_intervals, + cursor, + ) + else: + # Done: Either we have all intervals we need, or API has no more data + if remaining_intervals > 0: + _LOGGER.warning( + "API has no more data - received %d out of %d requested intervals (missing %d)", + len(price_info), + interval_count, + remaining_intervals, + ) + else: + _LOGGER.debug( + "Pagination complete - received all %d requested intervals", + interval_count, + ) + break + + _LOGGER_API_DETAILS.debug( + "Fetched %d total historical intervals for home %s across %d page(s)", + len(price_info), + home_id, + page, + ) + + return price_info + + async def _fetch_single_page( + self, + home_id: str, + cursor: str, + requested_count: int, + page: int, + ) -> dict[str, Any] | None: + """ + Fetch a single page of price intervals. + + Args: + home_id: Home ID to fetch price data for. + cursor: Base64-encoded cursor for this page. + requested_count: Number of intervals to request. + page: Page number (for logging). + + Returns: + Dict with "intervals", "count", and "end_cursor" keys, or None if no data. + + """ + query = f""" + {{viewer{{ + home(id: "{home_id}") {{ + id + currentSubscription {{ + priceInfoRange(resolution:QUARTER_HOURLY, first:{requested_count}, after: "{cursor}") {{ + pageInfo{{ + count + hasNextPage + startCursor + endCursor + }} + edges{{ + cursor + node{{ + startsAt total level + }} + }} + }} + }} + }} + }}}} + """ + + _LOGGER_API_DETAILS.debug( + "Fetching historical price info for home %s (page %d): %d intervals from cursor %s", + home_id, + page, + requested_count, + cursor, + ) + + data = await self._api_wrapper( + data={"query": query}, + query_type=TibberPricesQueryType.PRICE_INFO_RANGE, + ) + + # Parse response + viewer = data.get("viewer", {}) + home = viewer.get("home") + + if not home: + _LOGGER.warning("Home %s not found in API response", home_id) + return None + + if "currentSubscription" not in home or home["currentSubscription"] is None: + _LOGGER.warning("Home %s has no active subscription - price data will be unavailable", home_id) + return None + + # Extract priceInfoRange data + subscription = home["currentSubscription"] + price_info_range = subscription.get("priceInfoRange", {}) + page_info = price_info_range.get("pageInfo", {}) + edges = price_info_range.get("edges", []) + + # Flatten edges to interval list + intervals = [edge["node"] for edge in edges if "node" in edge] + + return { + "intervals": intervals, + "count": page_info.get("count", len(intervals)), + "end_cursor": page_info.get("endCursor"), + "has_next_page": page_info.get("hasNextPage", False), + } + def _extract_home_timezones(self, user_data: dict[str, Any]) -> dict[str, str]: """ Extract home_id -> timezone mapping from user_data. @@ -249,30 +571,30 @@ class TibberPricesApiClient: if home_id and timezone: home_timezones[home_id] = timezone - _LOGGER.debug("Extracted timezone %s for home %s", timezone, home_id) + _LOGGER_API_DETAILS.debug("Extracted timezone %s for home %s", timezone, home_id) elif home_id: _LOGGER.warning("Home %s has no timezone in user data, will use fallback", home_id) return home_timezones - def _calculate_cursor_for_home(self, home_timezone: str | None) -> str: + def _calculate_day_before_yesterday_midnight(self, home_timezone: str | None) -> datetime: """ - Calculate cursor (day before yesterday midnight) for a home's timezone. + Calculate day before yesterday midnight in home's timezone. + + CRITICAL: Uses REAL TIME (dt_utils.now()), NOT TimeService.now(). + This ensures API boundary calculations are based on actual current time, + not simulated time from TimeService. Args: - home_timezone: Timezone string (e.g., "Europe/Oslo", "America/New_York"). + home_timezone: Timezone string (e.g., "Europe/Oslo"). If None, falls back to HA system timezone. Returns: - Base64-encoded ISO timestamp string for use as GraphQL cursor. + Timezone-aware datetime for day before yesterday midnight. """ - if not self.time: - msg = "TimeService not initialized" - raise TibberPricesApiClientError(msg) - - # Get current time - now = self.time.now() + # Get current REAL time (not TimeService) + now = dt_utils.now() # Convert to home's timezone or fallback to HA system timezone if home_timezone: @@ -290,15 +612,54 @@ class TibberPricesApiClient: # Fallback to HA system timezone now_in_home_tz = dt_utils.as_local(now) - # Calculate day before yesterday midnight in home's timezone - day_before_yesterday_midnight = (now_in_home_tz - timedelta(days=2)).replace( - hour=0, minute=0, second=0, microsecond=0 - ) + # Calculate day before yesterday midnight + return (now_in_home_tz - timedelta(days=2)).replace(hour=0, minute=0, second=0, microsecond=0) - # Convert to ISO format and base64 encode - iso_string = day_before_yesterday_midnight.isoformat() + def _encode_cursor(self, timestamp: datetime) -> str: + """ + Encode a timestamp as base64 cursor for GraphQL API. + + Args: + timestamp: Timezone-aware datetime to encode. + + Returns: + Base64-encoded ISO timestamp string. + + """ + iso_string = timestamp.isoformat() return base64.b64encode(iso_string.encode()).decode() + def _parse_timestamp(self, timestamp_str: str) -> datetime: + """ + Parse ISO timestamp string to timezone-aware datetime. + + Args: + timestamp_str: ISO format timestamp string. + + Returns: + Timezone-aware datetime object. + + """ + return dt_utils.parse_datetime(timestamp_str) or dt_utils.now() + + def _calculate_cursor_for_home(self, home_timezone: str | None) -> str: + """ + Calculate cursor (day before yesterday midnight) for a home's timezone. + + Convenience wrapper around _calculate_day_before_yesterday_midnight() + and _encode_cursor() for backward compatibility with existing code. + + Args: + home_timezone: Timezone string (e.g., "Europe/Oslo", "America/New_York"). + If None, falls back to HA system timezone. + + Returns: + Base64-encoded ISO timestamp string for use as GraphQL cursor. + + """ + day_before_yesterday_midnight = self._calculate_day_before_yesterday_midnight(home_timezone) + return self._encode_cursor(day_before_yesterday_midnight) + async def _make_request( self, headers: dict[str, str], @@ -306,7 +667,7 @@ class TibberPricesApiClient: query_type: TibberPricesQueryType, ) -> dict[str, Any]: """Make an API request with comprehensive error handling for network issues.""" - _LOGGER.debug("Making API request with data: %s", data) + _LOGGER_API_DETAILS.debug("Making API request with data: %s", data) try: # More granular timeout configuration for better network failure handling @@ -326,7 +687,7 @@ class TibberPricesApiClient: verify_response_or_raise(response) response_json = await response.json() - _LOGGER.debug("Received API response: %s", response_json) + _LOGGER_API_DETAILS.debug("Received API response: %s", response_json) await verify_graphql_response(response_json, query_type) @@ -434,7 +795,7 @@ class TibberPricesApiClient: time_since_last_request = now - self._last_request_time if time_since_last_request < self._min_request_interval: sleep_time = (self._min_request_interval - time_since_last_request).total_seconds() - _LOGGER.debug( + _LOGGER_API_DETAILS.debug( "Rate limiting: waiting %s seconds before next request", sleep_time, ) @@ -480,23 +841,18 @@ class TibberPricesApiClient: """Handle retry logic for API-specific errors.""" error_msg = str(error) - # Non-retryable: Invalid queries - if "Invalid GraphQL query" in error_msg or "Bad request" in error_msg: + # Non-retryable: Invalid queries, bad requests, empty data + # Empty data means API has no data for the requested range - retrying won't help + if "Invalid GraphQL query" in error_msg or "Bad request" in error_msg or "Empty data received" in error_msg: return False, 0 - # Rate limits - special handling with extracted delay + # Rate limits - only retry if server explicitly says so if "Rate limit exceeded" in error_msg or "rate limited" in error_msg.lower(): delay = self._extract_retry_delay(error, retry) return True, delay - # Empty data - retryable with capped exponential backoff - if "Empty data received" in error_msg: - delay = min(self._retry_delay * (2**retry), 60) # Cap at 60 seconds - return True, delay - - # Other API errors - retryable with capped exponential backoff - delay = min(self._retry_delay * (2**retry), 30) # Cap at 30 seconds - return True, delay + # Other API errors - not retryable (assume permanent issue) + return False, 0 def _extract_retry_delay(self, error: Exception, retry: int) -> int: """Extract retry delay from rate limit error or use exponential backoff.""" diff --git a/custom_components/tibber_prices/api/helpers.py b/custom_components/tibber_prices/api/helpers.py index 8b461b7..9fdef64 100644 --- a/custom_components/tibber_prices/api/helpers.py +++ b/custom_components/tibber_prices/api/helpers.py @@ -19,6 +19,7 @@ from .exceptions import ( ) _LOGGER = logging.getLogger(__name__) +_LOGGER_DETAILS = logging.getLogger(__name__ + ".details") HTTP_BAD_REQUEST = 400 HTTP_UNAUTHORIZED = 401 @@ -95,14 +96,137 @@ async def verify_graphql_response(response_json: dict, query_type: TibberPricesQ TibberPricesApiClientError.GRAPHQL_ERROR.format(message="Response missing data object") ) - # Empty data check (for retry logic) - always check, regardless of query_type + # Empty data check - validate response completeness + # This is NOT a retryable error - API simply has no data for the requested range if is_data_empty(response_json["data"], query_type.value): - _LOGGER.debug("Empty data detected for query_type: %s", query_type) + _LOGGER_DETAILS.debug("Empty data detected for query_type: %s - API has no data available", query_type) raise TibberPricesApiClientError( TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=query_type.value) ) +def _check_user_data_empty(data: dict) -> bool: + """Check if user data is empty or incomplete.""" + has_user_id = ( + "viewer" in data + and isinstance(data["viewer"], dict) + and "userId" in data["viewer"] + and data["viewer"]["userId"] is not None + ) + has_homes = ( + "viewer" in data + and isinstance(data["viewer"], dict) + and "homes" in data["viewer"] + and isinstance(data["viewer"]["homes"], list) + and len(data["viewer"]["homes"]) > 0 + ) + is_empty = not has_user_id or not has_homes + _LOGGER_DETAILS.debug( + "Viewer check - has_user_id: %s, has_homes: %s, is_empty: %s", + has_user_id, + has_homes, + is_empty, + ) + return is_empty + + +def _check_price_info_empty(data: dict) -> bool: + """ + Check if price_info data is empty or incomplete. + + Note: Missing currentSubscription is VALID (home without active contract). + Only check for structural issues, not missing data that legitimately might not exist. + """ + viewer = data.get("viewer", {}) + home_data = viewer.get("home") + + if not home_data: + _LOGGER_DETAILS.debug("No home data found in price_info response") + return True + + _LOGGER_DETAILS.debug("Checking price_info for single home") + + # Missing currentSubscription is VALID - home has no active contract + # This is not an "empty data" error, it's a legitimate state + if "currentSubscription" not in home_data or home_data["currentSubscription"] is None: + _LOGGER_DETAILS.debug("No currentSubscription - home has no active contract (valid state)") + return False # NOT empty - this is expected for homes without subscription + + subscription = home_data["currentSubscription"] + + # Check priceInfoRange (yesterday data - optional, may not be available) + has_yesterday = ( + "priceInfoRange" in subscription + and subscription["priceInfoRange"] is not None + and "edges" in subscription["priceInfoRange"] + and subscription["priceInfoRange"]["edges"] + ) + + # Check priceInfo for today's data (required if subscription exists) + has_price_info = "priceInfo" in subscription and subscription["priceInfo"] is not None + has_today = ( + has_price_info + and "today" in subscription["priceInfo"] + and subscription["priceInfo"]["today"] is not None + and len(subscription["priceInfo"]["today"]) > 0 + ) + + # Only require today's data - yesterday is optional + # If subscription exists but no today data, that's a structural problem + is_empty = not has_today + + _LOGGER_DETAILS.debug( + "Price info check - priceInfoRange: %s, today: %s, is_empty: %s", + bool(has_yesterday), + bool(has_today), + is_empty, + ) + return is_empty + + +def _check_price_info_range_empty(data: dict) -> bool: + """ + Check if price_info_range data is empty or incomplete. + + For historical range queries, empty edges array is VALID (no data available + for that time range, e.g., too old). Only structural problems are errors. + """ + viewer = data.get("viewer", {}) + home_data = viewer.get("home") + + if not home_data: + _LOGGER_DETAILS.debug("No home data found in price_info_range response") + return True + + subscription = home_data.get("currentSubscription") + if not subscription: + _LOGGER_DETAILS.debug("Missing currentSubscription in home") + return True + + # For price_info_range, check if the structure exists + # Empty edges array is VALID (no data for that time range) + price_info_range = subscription.get("priceInfoRange") + if price_info_range is None: + _LOGGER_DETAILS.debug("Missing priceInfoRange in subscription") + return True + + if "edges" not in price_info_range: + _LOGGER_DETAILS.debug("Missing edges key in priceInfoRange") + return True + + edges = price_info_range["edges"] + if not isinstance(edges, list): + _LOGGER_DETAILS.debug("priceInfoRange edges is not a list") + return True + + # Empty edges is VALID for historical queries (data not available) + _LOGGER_DETAILS.debug( + "Price info range check - structure valid, edge_count: %s (empty is OK for old data)", + len(edges), + ) + return False # Structure is valid, even if edges is empty + + def is_data_empty(data: dict, query_type: str) -> bool: """ Check if the response data is empty or incomplete. @@ -117,85 +241,28 @@ def is_data_empty(data: dict, query_type: str) -> bool: - Must have range data - Must have today data - tomorrow can be empty if we have valid historical and today data - """ - _LOGGER.debug("Checking if data is empty for query_type %s", query_type) - is_empty = False + For price info range: + - Must have priceInfoRange with edges + Used by interval pool for historical data fetching + """ + _LOGGER_DETAILS.debug("Checking if data is empty for query_type %s", query_type) + try: if query_type == "user": - has_user_id = ( - "viewer" in data - and isinstance(data["viewer"], dict) - and "userId" in data["viewer"] - and data["viewer"]["userId"] is not None - ) - has_homes = ( - "viewer" in data - and isinstance(data["viewer"], dict) - and "homes" in data["viewer"] - and isinstance(data["viewer"]["homes"], list) - and len(data["viewer"]["homes"]) > 0 - ) - is_empty = not has_user_id or not has_homes - _LOGGER.debug( - "Viewer check - has_user_id: %s, has_homes: %s, is_empty: %s", - has_user_id, - has_homes, - is_empty, - ) + return _check_user_data_empty(data) + if query_type == "price_info": + return _check_price_info_empty(data) + if query_type == "price_info_range": + return _check_price_info_range_empty(data) - elif query_type == "price_info": - # Check for single home data (viewer.home) - viewer = data.get("viewer", {}) - home_data = viewer.get("home") - - if not home_data: - _LOGGER.debug("No home data found in price_info response") - is_empty = True - else: - _LOGGER.debug("Checking price_info for single home") - - if not home_data or "currentSubscription" not in home_data or home_data["currentSubscription"] is None: - _LOGGER.debug("Missing currentSubscription in home") - is_empty = True - else: - subscription = home_data["currentSubscription"] - - # Check priceInfoRange (96 quarter-hourly intervals) - has_yesterday = ( - "priceInfoRange" in subscription - and subscription["priceInfoRange"] is not None - and "edges" in subscription["priceInfoRange"] - and subscription["priceInfoRange"]["edges"] - ) - - # Check priceInfo for today's data - has_price_info = "priceInfo" in subscription and subscription["priceInfo"] is not None - has_today = ( - has_price_info - and "today" in subscription["priceInfo"] - and subscription["priceInfo"]["today"] is not None - and len(subscription["priceInfo"]["today"]) > 0 - ) - - # Data is empty if we don't have historical data or today's data - is_empty = not has_yesterday or not has_today - - _LOGGER.debug( - "Price info check - priceInfoRange: %s, today: %s, is_empty: %s", - bool(has_yesterday), - bool(has_today), - is_empty, - ) - - else: - _LOGGER.debug("Unknown query type %s, treating as non-empty", query_type) - is_empty = False + # Unknown query type + _LOGGER_DETAILS.debug("Unknown query type %s, treating as non-empty", query_type) except (KeyError, IndexError, TypeError) as error: - _LOGGER.debug("Error checking data emptiness: %s", error) - is_empty = True - - return is_empty + _LOGGER_DETAILS.debug("Error checking data emptiness: %s", error) + return True + else: + return False def prepare_headers(access_token: str, version: str) -> dict[str, str]: diff --git a/custom_components/tibber_prices/api/queries.py b/custom_components/tibber_prices/api/queries.py index 73d2d66..0a2a391 100644 --- a/custom_components/tibber_prices/api/queries.py +++ b/custom_components/tibber_prices/api/queries.py @@ -19,12 +19,23 @@ class TibberPricesQueryType(Enum): - Provides the core dataset needed for live data, recent historical context (important until tomorrow's data arrives), and tomorrow's forecast - Tibber likely has optimized caching for this frequently-accessed data range + - Boundary: FROM "day before yesterday midnight" (real time) onwards PRICE_INFO_RANGE: - Used for historical data older than day before yesterday - Allows flexible date range queries with cursor-based pagination - Required for any intervals beyond the 4-day window of PRICE_INFO - Use this for historical analysis, comparisons, or trend calculations + - Boundary: BEFORE "day before yesterday midnight" (real time) + + ROUTING: + - Use async_get_price_info_for_range() wrapper for automatic routing + - Wrapper intelligently splits requests spanning the boundary: + * Fully historical range (end < boundary) → PRICE_INFO_RANGE only + * Fully recent range (start >= boundary) → PRICE_INFO only + * Spanning range → Both queries, merged results + - Boundary calculated using REAL TIME (dt_utils.now()), not TimeService + to ensure predictable API responses USER: - Fetches user account data and home metadata diff --git a/custom_components/tibber_prices/const.py b/custom_components/tibber_prices/const.py index 4dc12e4..6d17409 100644 --- a/custom_components/tibber_prices/const.py +++ b/custom_components/tibber_prices/const.py @@ -24,6 +24,11 @@ DATA_CHART_CONFIG = "chart_config" # Key for chart export config in hass.data # Configuration keys CONF_EXTENDED_DESCRIPTIONS = "extended_descriptions" +CONF_VIRTUAL_TIME_OFFSET_DAYS = ( + "virtual_time_offset_days" # Time-travel: days offset (negative only, e.g., -7 = 7 days ago) +) +CONF_VIRTUAL_TIME_OFFSET_HOURS = "virtual_time_offset_hours" # Time-travel: hours offset (-23 to +23) +CONF_VIRTUAL_TIME_OFFSET_MINUTES = "virtual_time_offset_minutes" # Time-travel: minutes offset (-59 to +59) CONF_BEST_PRICE_FLEX = "best_price_flex" CONF_PEAK_PRICE_FLEX = "peak_price_flex" CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG = "best_price_min_distance_from_avg" @@ -54,6 +59,9 @@ ATTRIBUTION = "Data provided by Tibber" # Integration name should match manifest.json DEFAULT_NAME = "Tibber Price Information & Ratings" DEFAULT_EXTENDED_DESCRIPTIONS = False +DEFAULT_VIRTUAL_TIME_OFFSET_DAYS = 0 # No time offset (live mode) +DEFAULT_VIRTUAL_TIME_OFFSET_HOURS = 0 +DEFAULT_VIRTUAL_TIME_OFFSET_MINUTES = 0 DEFAULT_BEST_PRICE_FLEX = 15 # 15% base flexibility - optimal for relaxation mode (default enabled) # Peak price flexibility is set to -20% (20% base flexibility - optimal for relaxation mode). # This is intentionally more flexible than best price (15%) because peak price periods can be more variable, diff --git a/custom_components/tibber_prices/coordinator/core.py b/custom_components/tibber_prices/coordinator/core.py index f4df201..e8c8686 100644 --- a/custom_components/tibber_prices/coordinator/core.py +++ b/custom_components/tibber_prices/coordinator/core.py @@ -163,6 +163,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): hass: HomeAssistant, config_entry: ConfigEntry, api_client: TibberPricesApiClient, + interval_pool: Any, # TibberPricesIntervalPool - Any to avoid circular import ) -> None: """Initialize the coordinator.""" super().__init__( @@ -182,6 +183,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): # Use the API client from runtime_data (created in __init__.py with proper TOKEN handling) self.api = api_client + # Use the shared interval pool (one per config entry/Tibber account) + self.interval_pool = interval_pool + # Storage for persistence storage_key = f"{DOMAIN}.{config_entry.entry_id}" self._store = Store(hass, STORAGE_VERSION, storage_key) diff --git a/custom_components/tibber_prices/data.py b/custom_components/tibber_prices/data.py index 2916b37..73f610e 100644 --- a/custom_components/tibber_prices/data.py +++ b/custom_components/tibber_prices/data.py @@ -11,6 +11,7 @@ if TYPE_CHECKING: from .api import TibberPricesApiClient from .coordinator import TibberPricesDataUpdateCoordinator + from .interval_pool import TibberPricesIntervalPool @dataclass @@ -20,6 +21,7 @@ class TibberPricesData: client: TibberPricesApiClient coordinator: TibberPricesDataUpdateCoordinator integration: Integration + interval_pool: TibberPricesIntervalPool # Shared interval pool per config entry if TYPE_CHECKING: diff --git a/custom_components/tibber_prices/interval_pool/__init__.py b/custom_components/tibber_prices/interval_pool/__init__.py new file mode 100644 index 0000000..6d172fa --- /dev/null +++ b/custom_components/tibber_prices/interval_pool/__init__.py @@ -0,0 +1,21 @@ +"""Interval Pool - Intelligent interval caching and routing.""" + +from .manager import TibberPricesIntervalPool +from .routing import get_price_intervals_for_range +from .storage import ( + INTERVAL_POOL_STORAGE_VERSION, + async_load_pool_state, + async_remove_pool_storage, + async_save_pool_state, + get_storage_key, +) + +__all__ = [ + "INTERVAL_POOL_STORAGE_VERSION", + "TibberPricesIntervalPool", + "async_load_pool_state", + "async_remove_pool_storage", + "async_save_pool_state", + "get_price_intervals_for_range", + "get_storage_key", +] diff --git a/custom_components/tibber_prices/interval_pool/cache.py b/custom_components/tibber_prices/interval_pool/cache.py new file mode 100644 index 0000000..a4109a9 --- /dev/null +++ b/custom_components/tibber_prices/interval_pool/cache.py @@ -0,0 +1,194 @@ +"""Fetch group cache for price intervals.""" + +from __future__ import annotations + +import logging +from datetime import datetime, timedelta +from typing import Any + +from homeassistant.util import dt as dt_utils + +_LOGGER = logging.getLogger(__name__) +_LOGGER_DETAILS = logging.getLogger(__name__ + ".details") + +# Protected date range: day-before-yesterday to tomorrow (4 days total) +PROTECTED_DAYS_BEFORE = 2 # day-before-yesterday + yesterday +PROTECTED_DAYS_AFTER = 1 # tomorrow + + +class TibberPricesIntervalPoolFetchGroupCache: + """ + Storage for fetch groups with protected range management. + + A fetch group is a collection of intervals fetched at the same time, + stored together with their fetch timestamp for GC purposes. + + Structure: + { + "fetched_at": datetime, # When this group was fetched + "intervals": [dict, ...] # List of interval dicts + } + + Protected Range: + Intervals within day-before-yesterday to tomorrow are protected + and never evicted from cache. This range shifts daily automatically. + + Example (today = 2025-11-25): + Protected: 2025-11-23 00:00 to 2025-11-27 00:00 + """ + + def __init__(self) -> None: + """Initialize empty fetch group cache.""" + self._fetch_groups: list[dict[str, Any]] = [] + + # Protected range cache (invalidated daily) + self._protected_range_cache: tuple[str, str] | None = None + self._protected_range_cache_date: str | None = None + + def add_fetch_group( + self, + intervals: list[dict[str, Any]], + fetched_at: datetime, + ) -> int: + """ + Add new fetch group to cache. + + Args: + intervals: List of interval dicts (sorted by startsAt). + fetched_at: Timestamp when intervals were fetched. + + Returns: + Index of the newly added fetch group. + + """ + fetch_group = { + "fetched_at": fetched_at, + "intervals": intervals, + } + + fetch_group_index = len(self._fetch_groups) + self._fetch_groups.append(fetch_group) + + _LOGGER_DETAILS.debug( + "Added fetch group %d: %d intervals (fetched at %s)", + fetch_group_index, + len(intervals), + fetched_at.isoformat(), + ) + + return fetch_group_index + + def get_fetch_groups(self) -> list[dict[str, Any]]: + """Get all fetch groups (read-only access).""" + return self._fetch_groups + + def set_fetch_groups(self, fetch_groups: list[dict[str, Any]]) -> None: + """Replace all fetch groups (used during GC).""" + self._fetch_groups = fetch_groups + + def get_protected_range(self) -> tuple[str, str]: + """ + Get protected date range as ISO strings. + + Protected range: day-before-yesterday 00:00 to day-after-tomorrow 00:00. + This range shifts daily automatically. + + Returns: + Tuple of (start_iso, end_iso) for protected range. + Start is inclusive, end is exclusive. + + Example (today = 2025-11-25): + Returns: ("2025-11-23T00:00:00+01:00", "2025-11-27T00:00:00+01:00") + Protected days: 2025-11-23, 2025-11-24, 2025-11-25, 2025-11-26 + + """ + # Check cache validity (invalidate daily) + now = dt_utils.now() + today_date_str = now.date().isoformat() + + if self._protected_range_cache_date == today_date_str and self._protected_range_cache: + return self._protected_range_cache + + # Calculate new protected range + today_midnight = now.replace(hour=0, minute=0, second=0, microsecond=0) + + # Start: day-before-yesterday at 00:00 + start_dt = today_midnight - timedelta(days=PROTECTED_DAYS_BEFORE) + + # End: day after tomorrow at 00:00 (exclusive, so tomorrow is included) + end_dt = today_midnight + timedelta(days=PROTECTED_DAYS_AFTER + 1) + + # Convert to ISO strings and cache + start_iso = start_dt.isoformat() + end_iso = end_dt.isoformat() + + self._protected_range_cache = (start_iso, end_iso) + self._protected_range_cache_date = today_date_str + + return start_iso, end_iso + + def is_interval_protected(self, interval: dict[str, Any]) -> bool: + """ + Check if interval is within protected date range. + + Protected intervals are never evicted from cache. + + Args: + interval: Interval dict with "startsAt" ISO timestamp. + + Returns: + True if interval is protected (within protected range). + + """ + starts_at_iso = interval["startsAt"] + start_protected_iso, end_protected_iso = self.get_protected_range() + + # Fast string comparison (ISO timestamps are lexicographically sortable) + return start_protected_iso <= starts_at_iso < end_protected_iso + + def count_total_intervals(self) -> int: + """Count total intervals across all fetch groups.""" + return sum(len(group["intervals"]) for group in self._fetch_groups) + + def to_dict(self) -> dict[str, Any]: + """ + Serialize fetch groups for storage. + + Returns: + Dict with serializable fetch groups. + + """ + return { + "fetch_groups": [ + { + "fetched_at": group["fetched_at"].isoformat(), + "intervals": group["intervals"], + } + for group in self._fetch_groups + ], + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> TibberPricesIntervalPoolFetchGroupCache: + """ + Restore fetch groups from storage. + + Args: + data: Dict with "fetch_groups" list. + + Returns: + TibberPricesIntervalPoolFetchGroupCache instance with restored data. + + """ + cache = cls() + + fetch_groups_data = data.get("fetch_groups", []) + cache._fetch_groups = [ + { + "fetched_at": datetime.fromisoformat(group["fetched_at"]), + "intervals": group["intervals"], + } + for group in fetch_groups_data + ] + + return cache diff --git a/custom_components/tibber_prices/interval_pool/fetcher.py b/custom_components/tibber_prices/interval_pool/fetcher.py new file mode 100644 index 0000000..3a14a81 --- /dev/null +++ b/custom_components/tibber_prices/interval_pool/fetcher.py @@ -0,0 +1,322 @@ +"""Interval fetcher - gap detection and API coordination for interval pool.""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime, timedelta +from typing import TYPE_CHECKING, Any + +from homeassistant.util import dt as dt_utils + +if TYPE_CHECKING: + from collections.abc import Callable + + from custom_components.tibber_prices.api import TibberPricesApiClient + + from .cache import TibberPricesIntervalPoolFetchGroupCache + from .index import TibberPricesIntervalPoolTimestampIndex + +_LOGGER = logging.getLogger(__name__) +_LOGGER_DETAILS = logging.getLogger(__name__ + ".details") + +# Resolution change date (hourly before, quarter-hourly after) +# Use UTC for constant - timezone adjusted at runtime when comparing +RESOLUTION_CHANGE_DATETIME = datetime(2025, 10, 1, tzinfo=UTC) +RESOLUTION_CHANGE_ISO = "2025-10-01T00:00:00" + +# Interval lengths in minutes +INTERVAL_HOURLY = 60 +INTERVAL_QUARTER_HOURLY = 15 + +# Minimum gap sizes in seconds +MIN_GAP_HOURLY = 3600 # 1 hour +MIN_GAP_QUARTER_HOURLY = 900 # 15 minutes + +# Tolerance for time comparisons (±1 second for floating point/timezone issues) +TIME_TOLERANCE_SECONDS = 1 +TIME_TOLERANCE_MINUTES = 1 + + +class TibberPricesIntervalPoolFetcher: + """Fetch missing intervals from API based on gap detection.""" + + def __init__( + self, + api: TibberPricesApiClient, + cache: TibberPricesIntervalPoolFetchGroupCache, + index: TibberPricesIntervalPoolTimestampIndex, + home_id: str, + ) -> None: + """ + Initialize fetcher. + + Args: + api: API client for Tibber GraphQL queries. + cache: Fetch group cache for storage operations. + index: Timestamp index for gap detection. + home_id: Tibber home ID for API calls. + + """ + self._api = api + self._cache = cache + self._index = index + self._home_id = home_id + + def detect_gaps( + self, + cached_intervals: list[dict[str, Any]], + start_time_iso: str, + end_time_iso: str, + ) -> list[tuple[str, str]]: + """ + Detect missing time ranges that need to be fetched. + + This method minimizes API calls by: + 1. Finding all gaps in cached intervals + 2. Treating each cached interval as a discrete timestamp + 3. Gaps are time ranges between consecutive cached timestamps + + Handles both resolutions: + - Pre-2025-10-01: Hourly intervals (:00:00 only) + - Post-2025-10-01: Quarter-hourly intervals (:00:00, :15:00, :30:00, :45:00) + - DST transitions (23h/25h days) + + The API requires an interval count (first: X parameter). + For historical data (pre-2025-10-01), Tibber only stored hourly prices. + The API returns whatever intervals exist for the requested period. + + Args: + cached_intervals: List of cached intervals (may be empty). + start_time_iso: ISO timestamp string (inclusive). + end_time_iso: ISO timestamp string (exclusive). + + Returns: + List of (start_iso, end_iso) tuples representing missing ranges. + Each tuple represents a continuous time span that needs fetching. + Ranges are automatically split at resolution change boundary. + + Example: + Requested: 2025-11-13T00:00:00 to 2025-11-13T02:00:00 + Cached: [00:00, 00:15, 01:30, 01:45] + Gaps: [(00:15, 01:30)] - missing intervals between groups + + """ + if not cached_intervals: + # No cache → fetch entire range + return [(start_time_iso, end_time_iso)] + + # Filter and sort cached intervals within requested range + in_range_intervals = [ + interval for interval in cached_intervals if start_time_iso <= interval["startsAt"] < end_time_iso + ] + sorted_intervals = sorted(in_range_intervals, key=lambda x: x["startsAt"]) + + if not sorted_intervals: + # All cached intervals are outside requested range + return [(start_time_iso, end_time_iso)] + + missing_ranges = [] + + # Parse start/end times once + start_time_dt = datetime.fromisoformat(start_time_iso) + end_time_dt = datetime.fromisoformat(end_time_iso) + + # Get first cached interval datetime for resolution logic + first_cached_dt = datetime.fromisoformat(sorted_intervals[0]["startsAt"]) + resolution_change_dt = RESOLUTION_CHANGE_DATETIME.replace(tzinfo=first_cached_dt.tzinfo) + + # Check gap before first cached interval + time_diff_before_first = (first_cached_dt - start_time_dt).total_seconds() + if time_diff_before_first > TIME_TOLERANCE_SECONDS: + missing_ranges.append((start_time_iso, sorted_intervals[0]["startsAt"])) + _LOGGER_DETAILS.debug( + "Gap before first cached interval: %s to %s (%.1f seconds)", + start_time_iso, + sorted_intervals[0]["startsAt"], + time_diff_before_first, + ) + + # Check gaps between consecutive cached intervals + for i in range(len(sorted_intervals) - 1): + current_interval = sorted_intervals[i] + next_interval = sorted_intervals[i + 1] + + current_start = current_interval["startsAt"] + next_start = next_interval["startsAt"] + + # Parse to datetime for accurate time difference + current_dt = datetime.fromisoformat(current_start) + next_dt = datetime.fromisoformat(next_start) + + # Calculate time difference in minutes + time_diff_minutes = (next_dt - current_dt).total_seconds() / 60 + + # Determine expected interval length based on date + expected_interval_minutes = ( + INTERVAL_HOURLY if current_dt < resolution_change_dt else INTERVAL_QUARTER_HOURLY + ) + + # Only create gap if intervals are NOT consecutive + if time_diff_minutes > expected_interval_minutes + TIME_TOLERANCE_MINUTES: + # Gap exists - missing intervals between them + # Missing range starts AFTER current interval ends + current_interval_end = current_dt + timedelta(minutes=expected_interval_minutes) + missing_ranges.append((current_interval_end.isoformat(), next_start)) + _LOGGER_DETAILS.debug( + "Gap between cached intervals: %s (ends at %s) to %s (%.1f min gap, expected %d min)", + current_start, + current_interval_end.isoformat(), + next_start, + time_diff_minutes, + expected_interval_minutes, + ) + + # Check gap after last cached interval + # An interval's startsAt time represents the START of that interval. + # The interval covers [startsAt, startsAt + interval_length). + # So the last interval ENDS at (startsAt + interval_length), not at startsAt! + last_cached_dt = datetime.fromisoformat(sorted_intervals[-1]["startsAt"]) + + # Calculate when the last interval ENDS + interval_minutes = INTERVAL_QUARTER_HOURLY if last_cached_dt >= resolution_change_dt else INTERVAL_HOURLY + last_interval_end_dt = last_cached_dt + timedelta(minutes=interval_minutes) + + # Only create gap if there's uncovered time AFTER the last interval ends + time_diff_after_last = (end_time_dt - last_interval_end_dt).total_seconds() + + # Need at least one full interval of gap + min_gap_seconds = MIN_GAP_QUARTER_HOURLY if last_cached_dt >= resolution_change_dt else MIN_GAP_HOURLY + if time_diff_after_last >= min_gap_seconds: + # Missing range starts AFTER the last cached interval ends + missing_ranges.append((last_interval_end_dt.isoformat(), end_time_iso)) + _LOGGER_DETAILS.debug( + "Gap after last cached interval: %s (ends at %s) to %s (%.1f seconds, need >= %d)", + sorted_intervals[-1]["startsAt"], + last_interval_end_dt.isoformat(), + end_time_iso, + time_diff_after_last, + min_gap_seconds, + ) + + if not missing_ranges: + _LOGGER.debug( + "No gaps detected - all intervals cached for range %s to %s", + start_time_iso, + end_time_iso, + ) + return missing_ranges + + # Split ranges at resolution change boundary (2025-10-01 00:00:00) + # This simplifies interval count calculation in API calls: + # - Pre-2025-10-01: Always hourly (1 interval/hour) + # - Post-2025-10-01: Always quarter-hourly (4 intervals/hour) + return self._split_at_resolution_boundary(missing_ranges) + + def _split_at_resolution_boundary(self, ranges: list[tuple[str, str]]) -> list[tuple[str, str]]: + """ + Split time ranges at resolution change boundary. + + Args: + ranges: List of (start_iso, end_iso) tuples. + + Returns: + List of ranges split at 2025-10-01T00:00:00 boundary. + + """ + split_ranges = [] + + for start_iso, end_iso in ranges: + # Check if range crosses the boundary + if start_iso < RESOLUTION_CHANGE_ISO < end_iso: + # Split into two ranges: before and after boundary + split_ranges.append((start_iso, RESOLUTION_CHANGE_ISO)) + split_ranges.append((RESOLUTION_CHANGE_ISO, end_iso)) + _LOGGER_DETAILS.debug( + "Split range at resolution boundary: (%s, %s) → (%s, %s) + (%s, %s)", + start_iso, + end_iso, + start_iso, + RESOLUTION_CHANGE_ISO, + RESOLUTION_CHANGE_ISO, + end_iso, + ) + else: + # Range doesn't cross boundary - keep as is + split_ranges.append((start_iso, end_iso)) + + return split_ranges + + async def fetch_missing_ranges( + self, + api_client: TibberPricesApiClient, + user_data: dict[str, Any], + missing_ranges: list[tuple[str, str]], + *, + on_intervals_fetched: Callable[[list[dict[str, Any]], str], None] | None = None, + ) -> list[list[dict[str, Any]]]: + """ + Fetch missing intervals from API. + + Makes one API call per missing range. Uses routing logic to select + the optimal endpoint (PRICE_INFO vs PRICE_INFO_RANGE). + + Args: + api_client: TibberPricesApiClient instance for API calls. + user_data: User data dict containing home metadata. + missing_ranges: List of (start_iso, end_iso) tuples to fetch. + on_intervals_fetched: Optional callback for each fetch result. + Receives (intervals, fetch_time_iso). + + Returns: + List of interval lists (one per missing range). + Each sublist contains intervals from one API call. + + Raises: + TibberPricesApiClientError: If API calls fail. + + """ + # Import here to avoid circular dependency + from custom_components.tibber_prices.interval_pool.routing import ( # noqa: PLC0415 + get_price_intervals_for_range, + ) + + fetch_time_iso = dt_utils.now().isoformat() + all_fetched_intervals = [] + + for idx, (missing_start_iso, missing_end_iso) in enumerate(missing_ranges, start=1): + _LOGGER_DETAILS.debug( + "API call %d/%d for home %s: fetching range %s to %s", + idx, + len(missing_ranges), + self._home_id, + missing_start_iso, + missing_end_iso, + ) + + # Parse ISO strings back to datetime for API call + missing_start_dt = datetime.fromisoformat(missing_start_iso) + missing_end_dt = datetime.fromisoformat(missing_end_iso) + + # Fetch intervals from API - routing returns ALL intervals (unfiltered) + fetched_intervals = await get_price_intervals_for_range( + api_client=api_client, + home_id=self._home_id, + user_data=user_data, + start_time=missing_start_dt, + end_time=missing_end_dt, + ) + + all_fetched_intervals.append(fetched_intervals) + + _LOGGER_DETAILS.debug( + "Fetched %d intervals from API for home %s (fetch time: %s)", + len(fetched_intervals), + self._home_id, + fetch_time_iso, + ) + + # Notify callback if provided (for immediate caching) + if on_intervals_fetched: + on_intervals_fetched(fetched_intervals, fetch_time_iso) + + return all_fetched_intervals diff --git a/custom_components/tibber_prices/interval_pool/garbage_collector.py b/custom_components/tibber_prices/interval_pool/garbage_collector.py new file mode 100644 index 0000000..283acf8 --- /dev/null +++ b/custom_components/tibber_prices/interval_pool/garbage_collector.py @@ -0,0 +1,237 @@ +"""Garbage collector for interval cache eviction.""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from .cache import TibberPricesIntervalPoolFetchGroupCache + from .index import TibberPricesIntervalPoolTimestampIndex + +_LOGGER = logging.getLogger(__name__) +_LOGGER_DETAILS = logging.getLogger(__name__ + ".details") + +# Maximum number of intervals to cache +# 1 days @ 15min resolution = 10 * 96 = 960 intervals +MAX_CACHE_SIZE = 960 + + +class TibberPricesIntervalPoolGarbageCollector: + """ + Manages cache eviction and dead interval cleanup. + + Eviction Strategy: + - Evict oldest fetch groups first (by fetched_at timestamp) + - Protected intervals (day-before-yesterday to tomorrow) are NEVER evicted + - Evict complete fetch groups, not individual intervals + + Dead Interval Cleanup: + When intervals are "touched" (re-fetched), they move to a new fetch group + but remain in the old group. This creates "dead intervals" that occupy + memory but are no longer referenced by the index. + """ + + def __init__( + self, + cache: TibberPricesIntervalPoolFetchGroupCache, + index: TibberPricesIntervalPoolTimestampIndex, + home_id: str, + ) -> None: + """ + Initialize garbage collector. + + Args: + home_id: Home ID for logging purposes. + cache: Fetch group cache to manage. + index: Timestamp index for living interval detection. + + """ + self._home_id = home_id + self._cache = cache + self._index = index + + def run_gc(self) -> bool: + """ + Run garbage collection if needed. + + Process: + 1. Clean up dead intervals from all fetch groups + 2. Count total intervals + 3. If > MAX_CACHE_SIZE, evict oldest fetch groups + 4. Rebuild index after eviction + + Returns: + True if any cleanup or eviction happened, False otherwise. + + """ + fetch_groups = self._cache.get_fetch_groups() + + # Phase 1: Clean up dead intervals + dead_count = self._cleanup_dead_intervals(fetch_groups) + + if dead_count > 0: + _LOGGER_DETAILS.debug( + "GC cleaned %d dead intervals (home %s)", + dead_count, + self._home_id, + ) + + # Phase 2: Count total intervals after cleanup + total_intervals = self._cache.count_total_intervals() + + if total_intervals <= MAX_CACHE_SIZE: + _LOGGER_DETAILS.debug( + "GC cleanup only for home %s: %d intervals <= %d limit (no eviction needed)", + self._home_id, + total_intervals, + MAX_CACHE_SIZE, + ) + return dead_count > 0 + + # Phase 3: Evict old fetch groups + evicted_indices = self._evict_old_groups(fetch_groups, total_intervals) + + if not evicted_indices: + # All intervals are protected, cannot evict + return dead_count > 0 + + # Phase 4: Rebuild cache and index + new_fetch_groups = [group for idx, group in enumerate(fetch_groups) if idx not in evicted_indices] + self._cache.set_fetch_groups(new_fetch_groups) + self._index.rebuild(new_fetch_groups) + + _LOGGER_DETAILS.debug( + "GC evicted %d fetch groups (home %s): %d intervals remaining", + len(evicted_indices), + self._home_id, + self._cache.count_total_intervals(), + ) + + return True + + def _cleanup_dead_intervals(self, fetch_groups: list[dict[str, Any]]) -> int: + """ + Remove dead intervals from all fetch groups. + + Dead intervals are no longer referenced by the index (they were touched + and moved to a newer fetch group). + + Args: + fetch_groups: List of fetch groups to clean. + + Returns: + Total number of dead intervals removed. + + """ + total_dead = 0 + + for group_idx, group in enumerate(fetch_groups): + old_intervals = group["intervals"] + if not old_intervals: + continue + + # Find living intervals (still in index at correct position) + living_intervals = [] + + for interval_idx, interval in enumerate(old_intervals): + starts_at_normalized = interval["startsAt"][:19] + index_entry = self._index.get(starts_at_normalized) + + if index_entry is not None: + # Check if index points to THIS position + if index_entry["fetch_group_index"] == group_idx and index_entry["interval_index"] == interval_idx: + living_intervals.append(interval) + else: + # Dead: index points elsewhere + total_dead += 1 + else: + # Dead: not in index + total_dead += 1 + + # Replace with cleaned list if any dead intervals found + if len(living_intervals) < len(old_intervals): + group["intervals"] = living_intervals + dead_count = len(old_intervals) - len(living_intervals) + _LOGGER_DETAILS.debug( + "GC cleaned %d dead intervals from fetch group %d (home %s)", + dead_count, + group_idx, + self._home_id, + ) + + return total_dead + + def _evict_old_groups( + self, + fetch_groups: list[dict[str, Any]], + total_intervals: int, + ) -> set[int]: + """ + Determine which fetch groups to evict to stay under MAX_CACHE_SIZE. + + Only evicts groups without protected intervals. + Groups evicted oldest-first (by fetched_at). + + Args: + fetch_groups: List of fetch groups. + total_intervals: Total interval count. + + Returns: + Set of fetch group indices to evict. + + """ + start_protected_iso, end_protected_iso = self._cache.get_protected_range() + + _LOGGER_DETAILS.debug( + "Protected range: %s to %s", + start_protected_iso[:10], + end_protected_iso[:10], + ) + + # Classify: protected vs evictable + evictable_groups = [] + + for idx, group in enumerate(fetch_groups): + has_protected = any(self._cache.is_interval_protected(interval) for interval in group["intervals"]) + + if not has_protected: + evictable_groups.append((idx, group)) + + # Sort by fetched_at (oldest first) + evictable_groups.sort(key=lambda x: x[1]["fetched_at"]) + + _LOGGER_DETAILS.debug( + "GC: %d protected groups, %d evictable groups", + len(fetch_groups) - len(evictable_groups), + len(evictable_groups), + ) + + # Evict until under limit + evicted_indices = set() + remaining = total_intervals + + for idx, group in evictable_groups: + if remaining <= MAX_CACHE_SIZE: + break + + group_count = len(group["intervals"]) + evicted_indices.add(idx) + remaining -= group_count + + _LOGGER_DETAILS.debug( + "GC evicting group %d (fetched %s): %d intervals, %d remaining", + idx, + group["fetched_at"].isoformat(), + group_count, + remaining, + ) + + if not evicted_indices: + _LOGGER.warning( + "GC cannot evict any groups (home %s): all %d intervals are protected", + self._home_id, + total_intervals, + ) + + return evicted_indices diff --git a/custom_components/tibber_prices/interval_pool/index.py b/custom_components/tibber_prices/interval_pool/index.py new file mode 100644 index 0000000..88342f4 --- /dev/null +++ b/custom_components/tibber_prices/interval_pool/index.py @@ -0,0 +1,151 @@ +"""Timestamp index for O(1) interval lookups.""" + +from __future__ import annotations + +import logging +from typing import Any + +_LOGGER = logging.getLogger(__name__) +_LOGGER_DETAILS = logging.getLogger(__name__ + ".details") + + +class TibberPricesIntervalPoolTimestampIndex: + """ + Fast O(1) timestamp-based interval lookup. + + Maps normalized ISO timestamp strings to fetch group + interval indices. + + Structure: + { + "2025-11-25T00:00:00": { + "fetch_group_index": 0, # Index in fetch groups list + "interval_index": 2 # Index within that group's intervals + }, + ... + } + + Normalization: + Timestamps are normalized to 19 characters (YYYY-MM-DDTHH:MM:SS) + by truncating microseconds and timezone info for fast string comparison. + """ + + def __init__(self) -> None: + """Initialize empty timestamp index.""" + self._index: dict[str, dict[str, int]] = {} + + def add( + self, + interval: dict[str, Any], + fetch_group_index: int, + interval_index: int, + ) -> None: + """ + Add interval to index. + + Args: + interval: Interval dict with "startsAt" ISO timestamp. + fetch_group_index: Index of fetch group containing this interval. + interval_index: Index within that fetch group's intervals list. + + """ + starts_at_normalized = self._normalize_timestamp(interval["startsAt"]) + self._index[starts_at_normalized] = { + "fetch_group_index": fetch_group_index, + "interval_index": interval_index, + } + + def get(self, timestamp: str) -> dict[str, int] | None: + """ + Look up interval location by timestamp. + + Args: + timestamp: ISO timestamp string (will be normalized). + + Returns: + Dict with fetch_group_index and interval_index, or None if not found. + + """ + starts_at_normalized = self._normalize_timestamp(timestamp) + return self._index.get(starts_at_normalized) + + def contains(self, timestamp: str) -> bool: + """ + Check if timestamp exists in index. + + Args: + timestamp: ISO timestamp string (will be normalized). + + Returns: + True if timestamp is in index. + + """ + starts_at_normalized = self._normalize_timestamp(timestamp) + return starts_at_normalized in self._index + + def remove(self, timestamp: str) -> None: + """ + Remove timestamp from index. + + Args: + timestamp: ISO timestamp string (will be normalized). + + """ + starts_at_normalized = self._normalize_timestamp(timestamp) + self._index.pop(starts_at_normalized, None) + + def clear(self) -> None: + """Clear entire index.""" + self._index.clear() + + def rebuild(self, fetch_groups: list[dict[str, Any]]) -> None: + """ + Rebuild index from fetch groups. + + Used after GC operations that modify fetch group structure. + + Args: + fetch_groups: List of fetch group dicts. + + """ + self._index.clear() + + for fetch_group_idx, group in enumerate(fetch_groups): + for interval_idx, interval in enumerate(group["intervals"]): + starts_at_normalized = self._normalize_timestamp(interval["startsAt"]) + self._index[starts_at_normalized] = { + "fetch_group_index": fetch_group_idx, + "interval_index": interval_idx, + } + + _LOGGER_DETAILS.debug( + "Rebuilt index: %d timestamps indexed", + len(self._index), + ) + + def get_raw_index(self) -> dict[str, dict[str, int]]: + """Get raw index dict (for serialization).""" + return self._index + + def count(self) -> int: + """Count total indexed timestamps.""" + return len(self._index) + + @staticmethod + def _normalize_timestamp(timestamp: str) -> str: + """ + Normalize ISO timestamp for indexing. + + Truncates to 19 characters (YYYY-MM-DDTHH:MM:SS) to remove + microseconds and timezone info for consistent string comparison. + + Args: + timestamp: Full ISO timestamp string. + + Returns: + Normalized timestamp (19 chars). + + Example: + "2025-11-25T00:00:00.000+01:00" → "2025-11-25T00:00:00" + + """ + return timestamp[:19] diff --git a/custom_components/tibber_prices/interval_pool/manager.py b/custom_components/tibber_prices/interval_pool/manager.py new file mode 100644 index 0000000..b96ca2f --- /dev/null +++ b/custom_components/tibber_prices/interval_pool/manager.py @@ -0,0 +1,539 @@ +"""Interval pool manager - main coordinator for interval caching.""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timedelta +from typing import TYPE_CHECKING, Any + +from custom_components.tibber_prices.api.exceptions import TibberPricesApiClientError +from homeassistant.util import dt as dt_utils + +from .cache import TibberPricesIntervalPoolFetchGroupCache +from .fetcher import TibberPricesIntervalPoolFetcher +from .garbage_collector import TibberPricesIntervalPoolGarbageCollector +from .index import TibberPricesIntervalPoolTimestampIndex +from .storage import async_save_pool_state + +if TYPE_CHECKING: + from custom_components.tibber_prices.api.client import TibberPricesApiClient + +_LOGGER = logging.getLogger(__name__) +_LOGGER_DETAILS = logging.getLogger(__name__ + ".details") + +# Interval lengths in minutes +INTERVAL_HOURLY = 60 +INTERVAL_QUARTER_HOURLY = 15 + +# Debounce delay for auto-save (seconds) +DEBOUNCE_DELAY_SECONDS = 3.0 + + +class TibberPricesIntervalPool: + """ + High-performance interval cache manager for a single Tibber home. + + Coordinates all interval pool components: + - TibberPricesIntervalPoolFetchGroupCache: Stores fetch groups and manages protected ranges + - TibberPricesIntervalPoolTimestampIndex: Provides O(1) timestamp lookups + - TibberPricesIntervalPoolGarbageCollector: Evicts old fetch groups when cache exceeds limits + - TibberPricesIntervalPoolFetcher: Detects gaps and fetches missing intervals from API + + Architecture: + - Each manager handles exactly ONE home (1:1 with config entry) + - home_id is immutable after initialization + - All operations are thread-safe via asyncio locks + + Features: + - Fetch-time based eviction (oldest fetch groups removed first) + - Protected date range (day-before-yesterday to tomorrow never evicted) + - Fast O(1) lookups by timestamp + - Automatic gap detection and API fetching + - Debounced auto-save to prevent excessive I/O + + Example: + manager = TibberPricesIntervalPool(home_id="abc123", hass=hass, entry_id=entry.entry_id) + intervals = await manager.get_intervals( + api_client=client, + user_data=data, + start_time=datetime(...), + end_time=datetime(...), + ) + + """ + + def __init__( + self, + *, + home_id: str, + api: TibberPricesApiClient, + hass: Any | None = None, + entry_id: str | None = None, + ) -> None: + """ + Initialize interval pool manager. + + Args: + home_id: Tibber home ID (required, immutable). + api: API client for fetching intervals. + hass: HomeAssistant instance for auto-save (optional). + entry_id: Config entry ID for auto-save (optional). + + """ + self._home_id = home_id + + # Initialize components with dependency injection + self._cache = TibberPricesIntervalPoolFetchGroupCache() + self._index = TibberPricesIntervalPoolTimestampIndex() + self._gc = TibberPricesIntervalPoolGarbageCollector(self._cache, self._index, home_id) + self._fetcher = TibberPricesIntervalPoolFetcher(api, self._cache, self._index, home_id) + + # Auto-save support + self._hass = hass + self._entry_id = entry_id + self._background_tasks: set[asyncio.Task] = set() + self._save_debounce_task: asyncio.Task | None = None + self._save_lock = asyncio.Lock() + + async def get_intervals( + self, + api_client: TibberPricesApiClient, + user_data: dict[str, Any], + start_time: datetime, + end_time: datetime, + ) -> list[dict[str, Any]]: + """ + Get price intervals for time range (cached + fetch missing). + + Main entry point for retrieving intervals. Coordinates: + 1. Check cache for existing intervals + 2. Detect missing time ranges + 3. Fetch missing ranges from API + 4. Add new intervals to cache (may trigger GC) + 5. Return complete interval list + + User receives ALL requested intervals even if cache exceeds limits. + Cache only keeps the most recent intervals (FIFO eviction). + + Args: + api_client: TibberPricesApiClient instance for API calls. + user_data: User data dict containing home metadata. + start_time: Start of range (inclusive, timezone-aware). + end_time: End of range (exclusive, timezone-aware). + + Returns: + List of price interval dicts, sorted by startsAt. + Contains ALL intervals in requested range (cached + fetched). + + Raises: + TibberPricesApiClientError: If API calls fail or validation errors. + + """ + # Validate inputs + if not user_data: + msg = "User data required for timezone-aware price fetching" + raise TibberPricesApiClientError(msg) + + if start_time >= end_time: + msg = f"Invalid time range: start_time ({start_time}) must be before end_time ({end_time})" + raise TibberPricesApiClientError(msg) + + # Convert to ISO strings for cache operations + start_time_iso = start_time.isoformat() + end_time_iso = end_time.isoformat() + + _LOGGER_DETAILS.debug( + "Interval pool request for home %s: range %s to %s", + self._home_id, + start_time_iso, + end_time_iso, + ) + + # Get cached intervals using index + cached_intervals = self._get_cached_intervals(start_time_iso, end_time_iso) + + # Detect missing ranges + missing_ranges = self._fetcher.detect_gaps(cached_intervals, start_time_iso, end_time_iso) + + if missing_ranges: + _LOGGER_DETAILS.debug( + "Detected %d missing range(s) for home %s - will make %d API call(s)", + len(missing_ranges), + self._home_id, + len(missing_ranges), + ) + else: + _LOGGER_DETAILS.debug( + "All intervals available in cache for home %s - zero API calls needed", + self._home_id, + ) + + # Fetch missing ranges from API + if missing_ranges: + fetch_time_iso = dt_utils.now().isoformat() + + # Fetch with callback for immediate caching + await self._fetcher.fetch_missing_ranges( + api_client=api_client, + user_data=user_data, + missing_ranges=missing_ranges, + on_intervals_fetched=lambda intervals, _: self._add_intervals(intervals, fetch_time_iso), + ) + + # After caching all API responses, read from cache again to get final result + # This ensures we return exactly what user requested, filtering out extra intervals + final_result = self._get_cached_intervals(start_time_iso, end_time_iso) + + _LOGGER_DETAILS.debug( + "Interval pool returning %d intervals for home %s " + "(initially %d cached, %d API calls made, final %d after re-reading cache)", + len(final_result), + self._home_id, + len(cached_intervals), + len(missing_ranges), + len(final_result), + ) + + return final_result + + def _get_cached_intervals( + self, + start_time_iso: str, + end_time_iso: str, + ) -> list[dict[str, Any]]: + """ + Get cached intervals for time range using timestamp index. + + Uses timestamp_index for O(1) lookups per timestamp. + + Args: + start_time_iso: ISO timestamp string (inclusive). + end_time_iso: ISO timestamp string (exclusive). + + Returns: + List of cached interval dicts in time range (may be empty or incomplete). + Sorted by startsAt timestamp. + + """ + # Parse query range once + start_time_dt = datetime.fromisoformat(start_time_iso) + end_time_dt = datetime.fromisoformat(end_time_iso) + + # Use index to find intervals: iterate through expected timestamps + result = [] + current_dt = start_time_dt + + # Determine interval step (15 min post-2025-10-01, 60 min pre) + resolution_change_dt = datetime(2025, 10, 1, tzinfo=start_time_dt.tzinfo) + interval_minutes = INTERVAL_QUARTER_HOURLY if current_dt >= resolution_change_dt else INTERVAL_HOURLY + + while current_dt < end_time_dt: + # Check if this timestamp exists in index (O(1) lookup) + current_dt_key = current_dt.isoformat()[:19] + location = self._index.get(current_dt_key) + + if location is not None: + # Get interval from fetch group + fetch_groups = self._cache.get_fetch_groups() + fetch_group = fetch_groups[location["fetch_group_index"]] + interval = fetch_group["intervals"][location["interval_index"]] + result.append(interval) + + # Move to next expected interval + current_dt += timedelta(minutes=interval_minutes) + + # Handle resolution change boundary + if interval_minutes == INTERVAL_HOURLY and current_dt >= resolution_change_dt: + interval_minutes = INTERVAL_QUARTER_HOURLY + + _LOGGER_DETAILS.debug( + "Cache lookup for home %s: found %d intervals in range %s to %s", + self._home_id, + len(result), + start_time_iso, + end_time_iso, + ) + + return result + + def _add_intervals( + self, + intervals: list[dict[str, Any]], + fetch_time_iso: str, + ) -> None: + """ + Add intervals as new fetch group to cache with GC. + + Strategy: + 1. Filter out duplicates (intervals already in cache) + 2. Handle "touch" (move cached intervals to new fetch group) + 3. Add new fetch group to cache + 4. Update timestamp index + 5. Run GC if needed + 6. Schedule debounced auto-save + + Args: + intervals: List of interval dicts from API. + fetch_time_iso: ISO timestamp string when intervals were fetched. + + """ + if not intervals: + return + + fetch_time_dt = datetime.fromisoformat(fetch_time_iso) + + # Classify intervals: new vs already cached + new_intervals = [] + intervals_to_touch = [] + + for interval in intervals: + starts_at_normalized = interval["startsAt"][:19] + if not self._index.contains(starts_at_normalized): + new_intervals.append(interval) + else: + intervals_to_touch.append((starts_at_normalized, interval)) + _LOGGER_DETAILS.debug( + "Interval %s already cached for home %s, will touch (update fetch time)", + interval["startsAt"], + self._home_id, + ) + + # Handle touched intervals: move to new fetch group + if intervals_to_touch: + self._touch_intervals(intervals_to_touch, fetch_time_dt) + + if not new_intervals: + if intervals_to_touch: + _LOGGER_DETAILS.debug( + "All %d intervals already cached for home %s (touched only)", + len(intervals), + self._home_id, + ) + return + + # Sort new intervals by startsAt + new_intervals.sort(key=lambda x: x["startsAt"]) + + # Add new fetch group to cache + fetch_group_index = self._cache.add_fetch_group(new_intervals, fetch_time_dt) + + # Update timestamp index for all new intervals + for interval_index, interval in enumerate(new_intervals): + starts_at_normalized = interval["startsAt"][:19] + self._index.add(interval, fetch_group_index, interval_index) + + _LOGGER_DETAILS.debug( + "Added fetch group %d to home %s cache: %d new intervals (fetched at %s)", + fetch_group_index, + self._home_id, + len(new_intervals), + fetch_time_iso, + ) + + # Run GC to evict old fetch groups if needed + gc_changed_data = self._gc.run_gc() + + # Schedule debounced auto-save if data changed + data_changed = len(new_intervals) > 0 or len(intervals_to_touch) > 0 or gc_changed_data + if data_changed and self._hass is not None and self._entry_id is not None: + self._schedule_debounced_save() + + def _touch_intervals( + self, + intervals_to_touch: list[tuple[str, dict[str, Any]]], + fetch_time_dt: datetime, + ) -> None: + """ + Move cached intervals to new fetch group (update fetch time). + + Creates a new fetch group containing references to existing intervals. + Updates the index to point to the new fetch group. + + Args: + intervals_to_touch: List of (normalized_timestamp, interval_dict) tuples. + fetch_time_dt: Datetime when intervals were fetched. + + """ + fetch_groups = self._cache.get_fetch_groups() + + # Create touch fetch group with existing interval references + touch_intervals = [] + for starts_at_normalized, _interval in intervals_to_touch: + # Get existing interval from old fetch group + location = self._index.get(starts_at_normalized) + if location is None: + continue # Should not happen, but be defensive + + old_group = fetch_groups[location["fetch_group_index"]] + existing_interval = old_group["intervals"][location["interval_index"]] + touch_intervals.append(existing_interval) + + # Add touch group to cache + touch_group_index = self._cache.add_fetch_group(touch_intervals, fetch_time_dt) + + # Update index to point to new fetch group + for interval_index, (starts_at_normalized, _) in enumerate(intervals_to_touch): + # Remove old index entry + self._index.remove(starts_at_normalized) + # Add new index entry pointing to touch group + interval = touch_intervals[interval_index] + self._index.add(interval, touch_group_index, interval_index) + + _LOGGER.debug( + "Touched %d cached intervals for home %s (moved to fetch group %d, fetched at %s)", + len(intervals_to_touch), + self._home_id, + touch_group_index, + fetch_time_dt.isoformat(), + ) + + def _schedule_debounced_save(self) -> None: + """ + Schedule debounced save with configurable delay. + + Cancels existing timer and starts new one if already scheduled. + This prevents multiple saves during rapid successive changes. + + """ + # Cancel existing debounce timer if running + if self._save_debounce_task is not None and not self._save_debounce_task.done(): + self._save_debounce_task.cancel() + _LOGGER.debug("Cancelled pending auto-save (new changes detected, resetting timer)") + + # Schedule new debounced save + task = asyncio.create_task( + self._debounced_save_worker(), + name=f"interval_pool_debounce_{self._entry_id}", + ) + self._save_debounce_task = task + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + async def _debounced_save_worker(self) -> None: + """Debounce worker: waits configured delay, then saves if not cancelled.""" + try: + await asyncio.sleep(DEBOUNCE_DELAY_SECONDS) + await self._auto_save_pool_state() + except asyncio.CancelledError: + _LOGGER.debug("Auto-save timer cancelled (expected - new changes arrived)") + raise + + async def _auto_save_pool_state(self) -> None: + """Auto-save pool state to storage with lock protection.""" + if self._hass is None or self._entry_id is None: + return + + async with self._save_lock: + try: + pool_state = self.to_dict() + await async_save_pool_state(self._hass, self._entry_id, pool_state) + _LOGGER.debug("Auto-saved interval pool for entry %s", self._entry_id) + except Exception: + _LOGGER.exception("Failed to auto-save interval pool for entry %s", self._entry_id) + + def to_dict(self) -> dict[str, Any]: + """ + Serialize interval pool state for storage. + + Filters out dead intervals (no longer referenced by index). + + Returns: + Dictionary containing serialized pool state (only living intervals). + + """ + fetch_groups = self._cache.get_fetch_groups() + + # Serialize fetch groups (only living intervals) + serialized_fetch_groups = [] + + for group_idx, fetch_group in enumerate(fetch_groups): + living_intervals = [] + + for interval_idx, interval in enumerate(fetch_group["intervals"]): + starts_at_normalized = interval["startsAt"][:19] + + # Check if interval is still referenced in index + location = self._index.get(starts_at_normalized) + # Only keep if index points to THIS position in THIS group + if ( + location is not None + and location["fetch_group_index"] == group_idx + and location["interval_index"] == interval_idx + ): + living_intervals.append(interval) + + # Only serialize groups with living intervals + if living_intervals: + serialized_fetch_groups.append( + { + "fetched_at": fetch_group["fetched_at"].isoformat(), + "intervals": living_intervals, + } + ) + + return { + "version": 1, + "home_id": self._home_id, + "fetch_groups": serialized_fetch_groups, + } + + @classmethod + def from_dict( + cls, + data: dict[str, Any], + *, + api: TibberPricesApiClient, + hass: Any | None = None, + entry_id: str | None = None, + ) -> TibberPricesIntervalPool | None: + """ + Restore interval pool manager from storage. + + Expects single-home format: {"version": 1, "home_id": "...", "fetch_groups": [...]} + Old multi-home format is treated as corrupted and returns None. + + Args: + data: Dictionary containing serialized pool state. + api: API client for fetching intervals. + hass: HomeAssistant instance for auto-save (optional). + entry_id: Config entry ID for auto-save (optional). + + Returns: + Restored TibberPricesIntervalPool instance, or None if format unknown/corrupted. + + """ + # Validate format + if not data or "home_id" not in data or "fetch_groups" not in data: + if "homes" in data: + _LOGGER.info( + "Interval pool storage uses old multi-home format (pre-2025-11-25). " + "Treating as corrupted. Pool will rebuild from API." + ) + else: + _LOGGER.warning("Interval pool storage format unknown or corrupted. Pool will rebuild from API.") + return None + + home_id = data["home_id"] + + # Create manager with home_id from storage + manager = cls(home_id=home_id, api=api, hass=hass, entry_id=entry_id) + + # Restore fetch groups to cache + for serialized_group in data.get("fetch_groups", []): + fetched_at_dt = datetime.fromisoformat(serialized_group["fetched_at"]) + intervals = serialized_group["intervals"] + fetch_group_index = manager._cache.add_fetch_group(intervals, fetched_at_dt) + + # Rebuild index for this fetch group + for interval_index, interval in enumerate(intervals): + manager._index.add(interval, fetch_group_index, interval_index) + + total_intervals = sum(len(group["intervals"]) for group in manager._cache.get_fetch_groups()) + _LOGGER.debug( + "Interval pool restored from storage (home %s, %d intervals)", + home_id, + total_intervals, + ) + + return manager diff --git a/custom_components/tibber_prices/interval_pool/routing.py b/custom_components/tibber_prices/interval_pool/routing.py new file mode 100644 index 0000000..3ec2ca3 --- /dev/null +++ b/custom_components/tibber_prices/interval_pool/routing.py @@ -0,0 +1,180 @@ +""" +Routing Module - API endpoint selection for price intervals. + +This module handles intelligent routing between different Tibber API endpoints: + +- PRICE_INFO: Recent data (from "day before yesterday midnight" onwards) +- PRICE_INFO_RANGE: Historical data (before "day before yesterday midnight") +- Automatic splitting and merging when range spans the boundary + +CRITICAL: Uses REAL TIME (dt_utils.now()) for API boundary calculation, +NOT TimeService.now() which may be shifted for internal simulation. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +from custom_components.tibber_prices.api.exceptions import TibberPricesApiClientError +from homeassistant.util import dt as dt_utils + +if TYPE_CHECKING: + from datetime import datetime + + from custom_components.tibber_prices.api.client import TibberPricesApiClient + +_LOGGER = logging.getLogger(__name__) +_LOGGER_DETAILS = logging.getLogger(__name__ + ".details") + + +async def get_price_intervals_for_range( + api_client: TibberPricesApiClient, + home_id: str, + user_data: dict[str, Any], + start_time: datetime, + end_time: datetime, +) -> list[dict[str, Any]]: + """ + Get price intervals for a specific time range with automatic routing. + + Automatically routes to the correct API endpoint based on the time range: + - PRICE_INFO_RANGE: For intervals exclusively before "day before yesterday midnight" (real time) + - PRICE_INFO: For intervals from "day before yesterday midnight" onwards + - Both: If range spans across the boundary, splits the request + + CRITICAL: Uses REAL TIME (dt_utils.now()) for API boundary calculation, + NOT TimeService.now() which may be shifted for internal simulation. + This ensures predictable API responses. + + CACHING STRATEGY: Returns ALL intervals from API response, NOT filtered. + The caller (pool.py) will cache everything and then filter to user request. + This maximizes cache efficiency - one API call can populate cache for + multiple subsequent queries. + + Args: + api_client: TibberPricesApiClient instance for API calls. + home_id: Home ID to fetch price data for. + user_data: User data dict containing home metadata (including timezone). + start_time: Start of the range (inclusive, timezone-aware). + end_time: End of the range (exclusive, timezone-aware). + + Returns: + List of ALL price interval dicts from API (unfiltered). + - PRICE_INFO: Returns ~384 intervals (day-before-yesterday to tomorrow) + - PRICE_INFO_RANGE: Returns intervals for requested historical range + - Both: Returns all intervals from both endpoints + + Raises: + TibberPricesApiClientError: If arguments invalid or requests fail. + + """ + if not user_data: + msg = "User data required for timezone-aware price fetching - fetch user data first" + raise TibberPricesApiClientError(msg) + + if not home_id: + msg = "Home ID is required" + raise TibberPricesApiClientError(msg) + + if start_time >= end_time: + msg = f"Invalid time range: start_time ({start_time}) must be before end_time ({end_time})" + raise TibberPricesApiClientError(msg) + + # Calculate boundary: day before yesterday midnight (REAL TIME, not TimeService) + boundary = _calculate_boundary(api_client, user_data, home_id) + + _LOGGER_DETAILS.debug( + "Routing price interval request for home %s: range %s to %s, boundary %s", + home_id, + start_time, + end_time, + boundary, + ) + + # Route based on time range + if end_time <= boundary: + # Entire range is historical (before day before yesterday) → use PRICE_INFO_RANGE + _LOGGER_DETAILS.debug("Range is fully historical, using PRICE_INFO_RANGE") + result = await api_client.async_get_price_info_range( + home_id=home_id, + user_data=user_data, + start_time=start_time, + end_time=end_time, + ) + return result["price_info"] + + if start_time >= boundary: + # Entire range is recent (from day before yesterday onwards) → use PRICE_INFO + _LOGGER_DETAILS.debug("Range is fully recent, using PRICE_INFO") + result = await api_client.async_get_price_info(home_id, user_data) + + # Return ALL intervals (unfiltered) for maximum cache efficiency + # Pool will cache everything, then filter to user request + return result["price_info"] + + # Range spans boundary → split request + _LOGGER_DETAILS.debug("Range spans boundary, splitting request") + + # Fetch historical part (start_time to boundary) + historical_result = await api_client.async_get_price_info_range( + home_id=home_id, + user_data=user_data, + start_time=start_time, + end_time=boundary, + ) + + # Fetch recent part (boundary onwards) + recent_result = await api_client.async_get_price_info(home_id, user_data) + + # Return ALL intervals (unfiltered) for maximum cache efficiency + # Pool will cache everything, then filter to user request + return historical_result["price_info"] + recent_result["price_info"] + + +def _calculate_boundary( + api_client: TibberPricesApiClient, + user_data: dict[str, Any], + home_id: str, +) -> datetime: + """ + Calculate the API boundary (day before yesterday midnight). + + Uses the API client's helper method to extract timezone and calculate boundary. + + Args: + api_client: TibberPricesApiClient instance. + user_data: User data dict containing home metadata. + home_id: Home ID to get timezone for. + + Returns: + Timezone-aware datetime for day before yesterday midnight. + + """ + # Extract timezone for this home + home_timezones = api_client._extract_home_timezones(user_data) # noqa: SLF001 + home_tz = home_timezones.get(home_id) + + # Calculate boundary using API client's method + return api_client._calculate_day_before_yesterday_midnight(home_tz) # noqa: SLF001 + + +def _parse_timestamp(timestamp_str: str) -> datetime: + """ + Parse ISO timestamp string to timezone-aware datetime. + + Args: + timestamp_str: ISO format timestamp string. + + Returns: + Timezone-aware datetime object. + + Raises: + ValueError: If timestamp string cannot be parsed. + + """ + result = dt_utils.parse_datetime(timestamp_str) + if result is None: + msg = f"Failed to parse timestamp: {timestamp_str}" + raise ValueError(msg) + return result diff --git a/custom_components/tibber_prices/interval_pool/storage.py b/custom_components/tibber_prices/interval_pool/storage.py new file mode 100644 index 0000000..4329343 --- /dev/null +++ b/custom_components/tibber_prices/interval_pool/storage.py @@ -0,0 +1,165 @@ +"""Storage management for interval pool.""" + +from __future__ import annotations + +import errno +import logging +from typing import TYPE_CHECKING, Any + +from homeassistant.helpers.storage import Store + +if TYPE_CHECKING: + from homeassistant.core import HomeAssistant + +_LOGGER = logging.getLogger(__name__) +_LOGGER_DETAILS = logging.getLogger(__name__ + ".details") + +# Storage version - increment when changing data structure +INTERVAL_POOL_STORAGE_VERSION = 1 + + +def get_storage_key(entry_id: str) -> str: + """ + Get storage key for interval pool based on config entry ID. + + Args: + entry_id: Home Assistant config entry ID + + Returns: + Storage key string + + """ + return f"tibber_prices.interval_pool.{entry_id}" + + +async def async_load_pool_state( + hass: HomeAssistant, + entry_id: str, +) -> dict[str, Any] | None: + """ + Load interval pool state from storage. + + Args: + hass: Home Assistant instance + entry_id: Config entry ID + + Returns: + Pool state dict or None if no cache exists + + """ + storage_key = get_storage_key(entry_id) + store: Store = Store(hass, INTERVAL_POOL_STORAGE_VERSION, storage_key) + + try: + stored = await store.async_load() + except Exception: + # Corrupted storage file, JSON parse error, or other exception + _LOGGER.exception( + "Failed to load interval pool storage for entry %s (corrupted file?), starting with empty pool", + entry_id, + ) + return None + + if stored is None: + _LOGGER.debug("No interval pool cache found for entry %s (first run)", entry_id) + return None + + # Validate storage structure (single-home format) + if not isinstance(stored, dict): + _LOGGER.warning( + "Invalid interval pool storage structure for entry %s (not a dict), ignoring", + entry_id, + ) + return None + + # Check for new single-home format (version 1, home_id, fetch_groups) + if "home_id" in stored and "fetch_groups" in stored: + _LOGGER.debug( + "Interval pool state loaded for entry %s (single-home format, %d fetch groups)", + entry_id, + len(stored.get("fetch_groups", [])), + ) + return stored + + # Check for old multi-home format (homes dict) - treat as incompatible + if "homes" in stored: + _LOGGER.info( + "Interval pool storage for entry %s uses old multi-home format (pre-2025-11-25). " + "Treating as incompatible. Pool will rebuild from API.", + entry_id, + ) + return None + + # Unknown format + _LOGGER.warning( + "Invalid interval pool storage structure for entry %s (missing required keys), ignoring", + entry_id, + ) + return None + + +async def async_save_pool_state( + hass: HomeAssistant, + entry_id: str, + pool_state: dict[str, Any], +) -> None: + """ + Save interval pool state to storage. + + Args: + hass: Home Assistant instance + entry_id: Config entry ID + pool_state: Pool state dict to save + + """ + storage_key = get_storage_key(entry_id) + store: Store = Store(hass, INTERVAL_POOL_STORAGE_VERSION, storage_key) + + try: + await store.async_save(pool_state) + _LOGGER_DETAILS.debug( + "Interval pool state saved for entry %s (%d fetch groups)", + entry_id, + len(pool_state.get("fetch_groups", [])), + ) + except OSError as err: + # Provide specific error messages based on errno + if err.errno == errno.ENOSPC: # Disk full + _LOGGER.exception( + "Cannot save interval pool storage for entry %s: Disk full!", + entry_id, + ) + elif err.errno == errno.EACCES: # Permission denied + _LOGGER.exception( + "Cannot save interval pool storage for entry %s: Permission denied!", + entry_id, + ) + else: + _LOGGER.exception( + "Failed to save interval pool storage for entry %s", + entry_id, + ) + + +async def async_remove_pool_storage( + hass: HomeAssistant, + entry_id: str, +) -> None: + """ + Remove interval pool storage file. + + Used when config entry is removed. + + Args: + hass: Home Assistant instance + entry_id: Config entry ID + + """ + storage_key = get_storage_key(entry_id) + store: Store = Store(hass, INTERVAL_POOL_STORAGE_VERSION, storage_key) + + try: + await store.async_remove() + _LOGGER.debug("Interval pool storage removed for entry %s", entry_id) + except OSError as ex: + _LOGGER.warning("Failed to remove interval pool storage for entry %s: %s", entry_id, ex)