fix(coordinator): track API calls separately from cached data usage

The lifecycle sensor was always showing "fresh" state because
_last_price_update was set on every coordinator update, regardless of
whether data came from API or cache.

Changes:
- interval_pool/manager.py: get_intervals() and get_sensor_data() now
  return tuple[data, bool] where bool indicates actual API call
- coordinator/price_data_manager.py: All fetch methods propagate
  api_called flag through the call chain
- coordinator/core.py: Only update _last_price_update when api_called=True,
  added debug logging to distinguish API calls from cached data
- services/get_price.py: Updated to handle new tuple return type

Impact: Lifecycle sensor now correctly shows "cached" during normal
15-minute updates (using pool cache) and only "fresh" within 5 minutes
of actual API calls. This fixes the issue where the sensor would never
leave the "fresh" state during frequent HA restarts or normal operation.
This commit is contained in:
Julian Pawlowski 2025-12-25 18:53:29 +00:00
parent 81ebfb4916
commit 23b4330b9a
4 changed files with 87 additions and 39 deletions

View file

@ -608,7 +608,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Get current price info to check if tomorrow data already exists
current_price_info = self.data.get("priceInfo", []) if self.data else []
result = await self._price_data_manager.handle_main_entry_update(
result, api_called = await self._price_data_manager.handle_main_entry_update(
current_time,
self._home_id,
self._transform_data,
@ -621,12 +621,22 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Sync user_data cache (price data is in IntervalPool)
self._cached_user_data = self._price_data_manager.cached_user_data
# Update lifecycle tracking - Pool decides if API was called
# We track based on result having data
if result and "priceInfo" in result and len(result["priceInfo"]) > 0:
self._last_price_update = current_time # Track when data was fetched
# Update lifecycle tracking - ONLY if API was actually called
# (not when returning cached data)
if api_called and result and "priceInfo" in result and len(result["priceInfo"]) > 0:
self._last_price_update = current_time # Track when data was fetched from API
self._api_calls_today += 1
self._lifecycle_state = "fresh" # Data just fetched
_LOGGER.debug(
"API call completed: Fetched %d intervals, updating lifecycle to 'fresh'",
len(result["priceInfo"]),
)
elif not api_called:
# Using cached data - lifecycle stays as is (cached/searching_tomorrow/etc.)
_LOGGER.debug(
"Using cached data: %d intervals from pool, no API call made",
len(result.get("priceInfo", [])),
)
except (
TibberPricesApiClientAuthenticationError,
TibberPricesApiClientCommunicationError,

View file

@ -287,7 +287,7 @@ class TibberPricesPriceDataManager:
current_time: datetime,
*,
include_tomorrow: bool = True,
) -> dict[str, Any]:
) -> tuple[dict[str, Any], bool]:
"""
Fetch data for a single home via pool.
@ -297,15 +297,23 @@ class TibberPricesPriceDataManager:
include_tomorrow: If True, request tomorrow's data too. If False,
only request up to end of today.
Returns:
Tuple of (data_dict, api_called):
- data_dict: Dictionary with timestamp, home_id, price_info, currency.
- api_called: True if API was called to fetch missing data.
"""
if not home_id:
self._log("warning", "No home ID provided - cannot fetch price data")
return {
"timestamp": current_time,
"home_id": "",
"price_info": [],
"currency": "EUR",
}
return (
{
"timestamp": current_time,
"home_id": "",
"price_info": [],
"currency": "EUR",
},
False, # No API call made
)
# Ensure we have user_data before fetching price data
# This is critical for timezone-aware cursor calculation
@ -336,26 +344,35 @@ class TibberPricesPriceDataManager:
raise TibberPricesApiClientError(msg)
# Retrieve price data via IntervalPool (single source of truth)
price_info = await self._fetch_via_pool(home_id, include_tomorrow=include_tomorrow)
price_info, api_called = await self._fetch_via_pool(home_id, include_tomorrow=include_tomorrow)
# Extract currency for this home from user_data
currency = self._get_currency_for_home(home_id)
self._log("debug", "Successfully fetched data for home %s (%d intervals)", home_id, len(price_info))
self._log(
"debug",
"Successfully fetched data for home %s (%d intervals, api_called=%s)",
home_id,
len(price_info),
api_called,
)
return {
"timestamp": current_time,
"home_id": home_id,
"price_info": price_info,
"currency": currency,
}
return (
{
"timestamp": current_time,
"home_id": home_id,
"price_info": price_info,
"currency": currency,
},
api_called,
)
async def _fetch_via_pool(
self,
home_id: str,
*,
include_tomorrow: bool = True,
) -> list[dict[str, Any]]:
) -> tuple[list[dict[str, Any]], bool]:
"""
Retrieve price data via IntervalPool.
@ -375,12 +392,14 @@ class TibberPricesPriceDataManager:
tomorrow data yet.
Returns:
List of price interval dicts.
Tuple of (intervals, api_called):
- intervals: List of price interval dicts.
- api_called: True if API was called to fetch missing data.
"""
# user_data is guaranteed by fetch_home_data(), but needed for type narrowing
if self._cached_user_data is None:
return []
return [], False # No data, no API call
self._log(
"debug",
@ -388,12 +407,14 @@ class TibberPricesPriceDataManager:
home_id,
include_tomorrow,
)
return await self._interval_pool.get_sensor_data(
intervals, api_called = await self._interval_pool.get_sensor_data(
api_client=self.api,
user_data=self._cached_user_data,
include_tomorrow=include_tomorrow,
)
return intervals, api_called
def _get_currency_for_home(self, home_id: str) -> str:
"""
Get currency for a specific home from cached user_data.
@ -463,7 +484,7 @@ class TibberPricesPriceDataManager:
transform_fn: Callable[[dict[str, Any]], dict[str, Any]],
*,
current_price_info: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
) -> tuple[dict[str, Any], bool]:
"""
Handle update for main entry - fetch data for this home.
@ -485,6 +506,11 @@ class TibberPricesPriceDataManager:
current_price_info: Current price intervals (from coordinator.data["priceInfo"]).
Used to check if tomorrow data already exists.
Returns:
Tuple of (transformed_data, api_called):
- transformed_data: Transformed data dict for coordinator.
- api_called: True if API was called to fetch missing data.
"""
# Update user data if needed (daily check)
user_data_updated = await self.update_user_data_if_needed(current_time)
@ -497,7 +523,7 @@ class TibberPricesPriceDataManager:
# Return a special marker in the result that coordinator can check
result = transform_fn({})
result["_home_not_found"] = True # Special marker for coordinator
return result
return result, False # No API call made (home doesn't exist)
# Determine if we should request tomorrow data
include_tomorrow = self.should_fetch_tomorrow_data(current_price_info)
@ -509,7 +535,7 @@ class TibberPricesPriceDataManager:
home_id,
include_tomorrow,
)
raw_data = await self.fetch_home_data(home_id, current_time, include_tomorrow=include_tomorrow)
raw_data, api_called = await self.fetch_home_data(home_id, current_time, include_tomorrow=include_tomorrow)
# Parse timestamps immediately after fetch
raw_data = helpers.parse_all_timestamps(raw_data, time=self.time)
@ -519,7 +545,7 @@ class TibberPricesPriceDataManager:
await self.store_cache()
# Transform for main entry
return transform_fn(raw_data)
return transform_fn(raw_data), api_called
async def handle_api_error(
self,

View file

@ -118,7 +118,7 @@ class TibberPricesIntervalPool:
user_data: dict[str, Any],
start_time: datetime,
end_time: datetime,
) -> list[dict[str, Any]]:
) -> tuple[list[dict[str, Any]], bool]:
"""
Get price intervals for time range (cached + fetch missing).
@ -139,8 +139,10 @@ class TibberPricesIntervalPool:
end_time: End of range (exclusive, timezone-aware).
Returns:
List of price interval dicts, sorted by startsAt.
Contains ALL intervals in requested range (cached + fetched).
Tuple of (intervals, api_called):
- intervals: List of price interval dicts, sorted by startsAt.
Contains ALL intervals in requested range (cached + fetched).
- api_called: True if API was called to fetch missing data, False if all from cache.
Raises:
TibberPricesApiClientError: If API calls fail or validation errors.
@ -200,15 +202,19 @@ class TibberPricesIntervalPool:
# This ensures we return exactly what user requested, filtering out extra intervals
final_result = self._get_cached_intervals(start_time_iso, end_time_iso)
# Track if API was called (True if any missing ranges were fetched)
api_called = len(missing_ranges) > 0
_LOGGER_DETAILS.debug(
"Pool returning %d intervals for home %s (from cache: %d, fetched from API: %d ranges)",
"Pool returning %d intervals for home %s (from cache: %d, fetched from API: %d ranges, api_called=%s)",
len(final_result),
self._home_id,
len(cached_intervals),
len(missing_ranges),
api_called,
)
return final_result
return final_result, api_called
async def get_sensor_data(
self,
@ -217,7 +223,7 @@ class TibberPricesIntervalPool:
home_timezone: str | None = None,
*,
include_tomorrow: bool = True,
) -> list[dict[str, Any]]:
) -> tuple[list[dict[str, Any]], bool]:
"""
Get price intervals for sensor data (day-before-yesterday to end-of-tomorrow).
@ -247,8 +253,10 @@ class TibberPricesIntervalPool:
DOES NOT affect returned data - always returns full range.
Returns:
List of price interval dicts for the 4-day window (including any cached
tomorrow data), sorted by startsAt.
Tuple of (intervals, api_called):
- intervals: List of price interval dicts for the 4-day window (including any cached
tomorrow data), sorted by startsAt.
- api_called: True if API was called to fetch missing data, False if all from cache.
"""
# Determine timezone
@ -286,7 +294,7 @@ class TibberPricesIntervalPool:
)
# Fetch data (may be partial if include_tomorrow=False)
await self.get_intervals(
_intervals, api_called = await self.get_intervals(
api_client=api_client,
user_data=user_data,
start_time=day_before_yesterday,
@ -295,11 +303,13 @@ class TibberPricesIntervalPool:
# Return FULL protected range (including any cached tomorrow data)
# This ensures cached tomorrow data is available even when include_tomorrow=False
return self._get_cached_intervals(
final_intervals = self._get_cached_intervals(
day_before_yesterday.isoformat(),
end_of_tomorrow.isoformat(),
)
return final_intervals, api_called
def get_pool_stats(self) -> dict[str, Any]:
"""
Get statistics about the interval pool.

View file

@ -145,12 +145,14 @@ async def handle_get_price(call: ServiceCall) -> ServiceResponse:
# Call the interval pool to get intervals (with intelligent caching)
# Single-home architecture: pool knows its home_id, no parameter needed
price_info = await pool.get_intervals(
price_info, _api_called = await pool.get_intervals(
api_client=api_client,
user_data=user_data,
start_time=start_time,
end_time=end_time,
)
# Note: We ignore api_called flag here - service always returns requested data
# regardless of whether it came from cache or was fetched fresh from API
except Exception as error:
_LOGGER.exception("Error fetching price data")