hass.tibber_prices/custom_components/tibber_prices/diagnostics.py
Julian Pawlowski 78df8a4b17 refactor(lifecycle): integrate with Pool for sensor metrics
Replace cache-based metrics with Pool as single source of truth:
- get_cache_age_minutes() → get_sensor_fetch_age_minutes() (from Pool)
- Remove get_cache_validity_status(), get_data_completeness_status()
- Add get_pool_stats() for comprehensive pool statistics
- Add has_tomorrow_data() using Pool as source

Attributes now show:
- sensor_intervals_count/expected/has_gaps (protected range)
- cache_intervals_total/limit/fill_percent/extra (entire pool)
- last_sensor_fetch, cache_oldest/newest_interval timestamps
- tomorrow_available based on Pool state

Impact: More accurate lifecycle status, consistent with Pool as source
of truth, cleaner diagnostic information.
2025-12-23 14:13:34 +00:00

80 lines
3.3 KiB
Python

"""
Diagnostics support for tibber_prices.
Learn more about diagnostics:
https://developers.home-assistant.io/docs/core/integration_diagnostics
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from homeassistant.core import HomeAssistant
from .data import TibberPricesConfigEntry
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, # noqa: ARG001
entry: TibberPricesConfigEntry,
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
coordinator = entry.runtime_data.coordinator
# Get period metadata from coordinator data
price_periods = coordinator.data.get("pricePeriods", {}) if coordinator.data else {}
return {
"entry": {
"entry_id": entry.entry_id,
"version": entry.version,
"minor_version": entry.minor_version,
"domain": entry.domain,
"title": entry.title,
"state": str(entry.state),
"home_id": entry.data.get("home_id", ""),
},
"coordinator": {
"last_update_success": coordinator.last_update_success,
"update_interval": str(coordinator.update_interval),
"data": coordinator.data,
"update_timestamps": {
"price": coordinator._last_price_update.isoformat() if coordinator._last_price_update else None, # noqa: SLF001
"user": coordinator._last_user_update.isoformat() if coordinator._last_user_update else None, # noqa: SLF001
"last_coordinator_update": coordinator._last_coordinator_update.isoformat() # noqa: SLF001
if coordinator._last_coordinator_update # noqa: SLF001
else None,
},
"lifecycle": {
"state": coordinator._lifecycle_state, # noqa: SLF001
"is_fetching": coordinator._is_fetching, # noqa: SLF001
"api_calls_today": coordinator._api_calls_today, # noqa: SLF001
"last_api_call_date": coordinator._last_api_call_date.isoformat() # noqa: SLF001
if coordinator._last_api_call_date # noqa: SLF001
else None,
},
},
"periods": {
"best_price": {
"count": len(price_periods.get("best_price", {}).get("periods", [])),
"metadata": price_periods.get("best_price", {}).get("metadata", {}),
},
"peak_price": {
"count": len(price_periods.get("peak_price", {}).get("periods", [])),
"metadata": price_periods.get("peak_price", {}).get("metadata", {}),
},
},
"config": {
"options": dict(entry.options),
},
"cache_status": {
"user_data_cached": coordinator._cached_user_data is not None, # noqa: SLF001
"has_price_data": coordinator.data is not None and "priceInfo" in (coordinator.data or {}),
"transformer_cache_valid": coordinator._data_transformer._cached_transformed_data is not None, # noqa: SLF001
"period_calculator_cache_valid": coordinator._period_calculator._cached_periods is not None, # noqa: SLF001
},
"error": {
"last_exception": str(coordinator.last_exception) if coordinator.last_exception else None,
},
}