mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-30 05:13:40 +00:00
Changed from centralized main+subentry coordinator pattern to independent
coordinators per home. Each config entry now manages its own home data
with its own API client and access token.
Architecture changes:
- API Client: async_get_price_info() changed from home_ids: set[str] to home_id: str
* Removed GraphQL alias pattern (home0, home1, ...)
* Single-home query structure without aliasing
* Simplified response parsing (viewer.home instead of viewer.home0)
- Coordinator: Removed main/subentry distinction
* Deleted is_main_entry() and _has_existing_main_coordinator()
* Each coordinator fetches its own data independently
* Removed _find_main_coordinator() and _get_configured_home_ids()
* Simplified _async_update_data() - no subentry logic
* Added _home_id instance variable from config_entry.data
- __init__.py: New _get_access_token() helper
* Handles token retrieval for both parent and subentries
* Subentries find parent entry to get shared access token
* Creates single API client instance per coordinator
- Data structures: Flat single-home format
* Old: {"homes": {home_id: {"price_info": [...]}}}
* New: {"home_id": str, "price_info": [...], "currency": str}
* Attribute name: "periods" → "pricePeriods" (consistent with priceInfo)
- helpers.py: Removed get_configured_home_ids() (no longer needed)
* parse_all_timestamps() updated for single-home structure
Impact: Each home operates independently with its own lifecycle tracking,
caching, and period calculations. Simpler architecture, easier debugging,
better isolation between homes.
247 lines
10 KiB
Python
247 lines
10 KiB
Python
"""Helper functions for API response processing."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import TYPE_CHECKING
|
|
|
|
from homeassistant.const import __version__ as ha_version
|
|
|
|
if TYPE_CHECKING:
|
|
import aiohttp
|
|
|
|
from .queries import TibberPricesQueryType
|
|
|
|
from .exceptions import (
|
|
TibberPricesApiClientAuthenticationError,
|
|
TibberPricesApiClientError,
|
|
TibberPricesApiClientPermissionError,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
HTTP_BAD_REQUEST = 400
|
|
HTTP_UNAUTHORIZED = 401
|
|
HTTP_FORBIDDEN = 403
|
|
HTTP_TOO_MANY_REQUESTS = 429
|
|
|
|
|
|
def verify_response_or_raise(response: aiohttp.ClientResponse) -> None:
|
|
"""Verify that the response is valid."""
|
|
if response.status == HTTP_UNAUTHORIZED:
|
|
_LOGGER.error("Tibber API authentication failed - check access token")
|
|
raise TibberPricesApiClientAuthenticationError(TibberPricesApiClientAuthenticationError.INVALID_CREDENTIALS)
|
|
if response.status == HTTP_FORBIDDEN:
|
|
_LOGGER.error("Tibber API access forbidden - insufficient permissions")
|
|
raise TibberPricesApiClientPermissionError(TibberPricesApiClientPermissionError.INSUFFICIENT_PERMISSIONS)
|
|
if response.status == HTTP_TOO_MANY_REQUESTS:
|
|
# Check for Retry-After header that Tibber might send
|
|
retry_after = response.headers.get("Retry-After", "unknown")
|
|
_LOGGER.warning("Tibber API rate limit exceeded - retry after %s seconds", retry_after)
|
|
raise TibberPricesApiClientError(TibberPricesApiClientError.RATE_LIMIT_ERROR.format(retry_after=retry_after))
|
|
if response.status == HTTP_BAD_REQUEST:
|
|
_LOGGER.error("Tibber API rejected request - likely invalid GraphQL query")
|
|
raise TibberPricesApiClientError(
|
|
TibberPricesApiClientError.INVALID_QUERY_ERROR.format(message="Bad request - likely invalid GraphQL query")
|
|
)
|
|
response.raise_for_status()
|
|
|
|
|
|
async def verify_graphql_response(response_json: dict, query_type: TibberPricesQueryType) -> None:
|
|
"""Verify the GraphQL response for errors and data completeness, including empty data."""
|
|
if "errors" in response_json:
|
|
errors = response_json["errors"]
|
|
if not errors:
|
|
_LOGGER.error("Tibber API returned empty errors array")
|
|
raise TibberPricesApiClientError(TibberPricesApiClientError.UNKNOWN_ERROR)
|
|
|
|
error = errors[0] # Take first error
|
|
if not isinstance(error, dict):
|
|
_LOGGER.error("Tibber API returned malformed error: %s", error)
|
|
raise TibberPricesApiClientError(TibberPricesApiClientError.MALFORMED_ERROR.format(error=error))
|
|
|
|
message = error.get("message", "Unknown error")
|
|
extensions = error.get("extensions", {})
|
|
error_code = extensions.get("code")
|
|
|
|
# Handle specific Tibber API error codes
|
|
if error_code == "UNAUTHENTICATED":
|
|
_LOGGER.error("Tibber API authentication error: %s", message)
|
|
raise TibberPricesApiClientAuthenticationError(TibberPricesApiClientAuthenticationError.INVALID_CREDENTIALS)
|
|
if error_code == "FORBIDDEN":
|
|
_LOGGER.error("Tibber API permission error: %s", message)
|
|
raise TibberPricesApiClientPermissionError(TibberPricesApiClientPermissionError.INSUFFICIENT_PERMISSIONS)
|
|
if error_code in ["RATE_LIMITED", "TOO_MANY_REQUESTS"]:
|
|
# Some GraphQL APIs return rate limit info in extensions
|
|
retry_after = extensions.get("retryAfter", "unknown")
|
|
_LOGGER.warning(
|
|
"Tibber API rate limited via GraphQL: %s (retry after %s)",
|
|
message,
|
|
retry_after,
|
|
)
|
|
raise TibberPricesApiClientError(
|
|
TibberPricesApiClientError.RATE_LIMIT_ERROR.format(retry_after=retry_after)
|
|
)
|
|
if error_code in ["VALIDATION_ERROR", "GRAPHQL_VALIDATION_FAILED"]:
|
|
_LOGGER.error("Tibber API validation error: %s", message)
|
|
raise TibberPricesApiClientError(TibberPricesApiClientError.INVALID_QUERY_ERROR.format(message=message))
|
|
|
|
_LOGGER.error("Tibber API GraphQL error (code: %s): %s", error_code or "unknown", message)
|
|
raise TibberPricesApiClientError(TibberPricesApiClientError.GRAPHQL_ERROR.format(message=message))
|
|
|
|
if "data" not in response_json or response_json["data"] is None:
|
|
_LOGGER.error("Tibber API response missing data object")
|
|
raise TibberPricesApiClientError(
|
|
TibberPricesApiClientError.GRAPHQL_ERROR.format(message="Response missing data object")
|
|
)
|
|
|
|
# Empty data check (for retry logic) - always check, regardless of query_type
|
|
if is_data_empty(response_json["data"], query_type.value):
|
|
_LOGGER.debug("Empty data detected for query_type: %s", query_type)
|
|
raise TibberPricesApiClientError(
|
|
TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=query_type.value)
|
|
)
|
|
|
|
|
|
def is_data_empty(data: dict, query_type: str) -> bool:
|
|
"""
|
|
Check if the response data is empty or incomplete.
|
|
|
|
For viewer data:
|
|
- Must have userId and homes
|
|
- If either is missing, data is considered empty
|
|
- If homes is empty, data is considered empty
|
|
- If userId is None, data is considered empty
|
|
|
|
For price info:
|
|
- Must have range data
|
|
- Must have today data
|
|
- tomorrow can be empty if we have valid historical and today data
|
|
"""
|
|
_LOGGER.debug("Checking if data is empty for query_type %s", query_type)
|
|
|
|
is_empty = False
|
|
try:
|
|
if query_type == "user":
|
|
has_user_id = (
|
|
"viewer" in data
|
|
and isinstance(data["viewer"], dict)
|
|
and "userId" in data["viewer"]
|
|
and data["viewer"]["userId"] is not None
|
|
)
|
|
has_homes = (
|
|
"viewer" in data
|
|
and isinstance(data["viewer"], dict)
|
|
and "homes" in data["viewer"]
|
|
and isinstance(data["viewer"]["homes"], list)
|
|
and len(data["viewer"]["homes"]) > 0
|
|
)
|
|
is_empty = not has_user_id or not has_homes
|
|
_LOGGER.debug(
|
|
"Viewer check - has_user_id: %s, has_homes: %s, is_empty: %s",
|
|
has_user_id,
|
|
has_homes,
|
|
is_empty,
|
|
)
|
|
|
|
elif query_type == "price_info":
|
|
# Check for single home data (viewer.home)
|
|
viewer = data.get("viewer", {})
|
|
home_data = viewer.get("home")
|
|
|
|
if not home_data:
|
|
_LOGGER.debug("No home data found in price_info response")
|
|
is_empty = True
|
|
else:
|
|
_LOGGER.debug("Checking price_info for single home")
|
|
|
|
if not home_data or "currentSubscription" not in home_data or home_data["currentSubscription"] is None:
|
|
_LOGGER.debug("Missing currentSubscription in home")
|
|
is_empty = True
|
|
else:
|
|
subscription = home_data["currentSubscription"]
|
|
|
|
# Check priceInfoRange (96 quarter-hourly intervals)
|
|
has_yesterday = (
|
|
"priceInfoRange" in subscription
|
|
and subscription["priceInfoRange"] is not None
|
|
and "edges" in subscription["priceInfoRange"]
|
|
and subscription["priceInfoRange"]["edges"]
|
|
)
|
|
|
|
# Check priceInfo for today's data
|
|
has_price_info = "priceInfo" in subscription and subscription["priceInfo"] is not None
|
|
has_today = (
|
|
has_price_info
|
|
and "today" in subscription["priceInfo"]
|
|
and subscription["priceInfo"]["today"] is not None
|
|
and len(subscription["priceInfo"]["today"]) > 0
|
|
)
|
|
|
|
# Data is empty if we don't have historical data or today's data
|
|
is_empty = not has_yesterday or not has_today
|
|
|
|
_LOGGER.debug(
|
|
"Price info check - priceInfoRange: %s, today: %s, is_empty: %s",
|
|
bool(has_yesterday),
|
|
bool(has_today),
|
|
is_empty,
|
|
)
|
|
|
|
else:
|
|
_LOGGER.debug("Unknown query type %s, treating as non-empty", query_type)
|
|
is_empty = False
|
|
except (KeyError, IndexError, TypeError) as error:
|
|
_LOGGER.debug("Error checking data emptiness: %s", error)
|
|
is_empty = True
|
|
|
|
return is_empty
|
|
|
|
|
|
def prepare_headers(access_token: str, version: str) -> dict[str, str]:
|
|
"""Prepare headers for API request."""
|
|
return {
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Accept": "application/json",
|
|
"User-Agent": f"HomeAssistant/{ha_version} tibber_prices/{version}",
|
|
}
|
|
|
|
|
|
def flatten_price_info(subscription: dict) -> list[dict]:
|
|
"""
|
|
Transform and flatten priceInfo from full API data structure.
|
|
|
|
Returns a flat list of all price intervals ordered as:
|
|
[day_before_yesterday_prices, yesterday_prices, today_prices, tomorrow_prices]
|
|
|
|
priceInfoRange fetches 192 quarter-hourly intervals starting from the day before
|
|
yesterday midnight (2 days of historical data), which provides sufficient data
|
|
for calculating trailing 24h averages for all intervals including yesterday.
|
|
|
|
Args:
|
|
subscription: The currentSubscription dictionary from API response.
|
|
|
|
Returns:
|
|
A flat list containing all price dictionaries (startsAt, total, level).
|
|
|
|
"""
|
|
price_info_range = subscription.get("priceInfoRange", {})
|
|
|
|
# Transform priceInfoRange edges data (extract historical quarter-hourly prices)
|
|
# This contains 192 intervals (2 days) starting from day before yesterday midnight
|
|
historical_prices = []
|
|
if "edges" in price_info_range:
|
|
edges = price_info_range["edges"]
|
|
|
|
for edge in edges:
|
|
if "node" not in edge:
|
|
_LOGGER.debug("Skipping edge without node: %s", edge)
|
|
continue
|
|
historical_prices.append(edge["node"])
|
|
|
|
# Return all intervals as a single flattened array
|
|
return (
|
|
historical_prices
|
|
+ subscription.get("priceInfo", {}).get("today", [])
|
|
+ subscription.get("priceInfo", {}).get("tomorrow", [])
|
|
)
|