feat(sensor): add data lifecycle diagnostic sensor with push updates

Add comprehensive data_lifecycle_status sensor showing real-time cache
vs fresh API data status with 6 states and 13+ detailed attributes.

Key features:
- 6 lifecycle states: cached, fresh, refreshing, searching_tomorrow,
  turnover_pending, error
- Push-update system for instant state changes (refreshing→fresh→error)
- Quarter-hour polling for turnover_pending detection at 23:45
- Accurate next_api_poll prediction using Timer #1 offset tracking
- Tomorrow prediction with actual timer schedule (not fixed 13:00)
- 13+ formatted attributes: cache_age, data_completeness, api_calls_today,
  next_api_poll, etc.

Implementation:
- sensor/calculators/lifecycle.py: New calculator with state logic
- sensor/attributes/lifecycle.py: Attribute builders with formatting
- coordinator/core.py: Lifecycle tracking + callback system (+16 lines)
- sensor/core.py: Push callback registration (+3 lines)
- coordinator/constants.py: Added to TIME_SENSITIVE_ENTITY_KEYS
- Translations: All 5 languages (de, en, nb, nl, sv)

Timing optimization:
- Extended turnover warning: 5min → 15min (catches 23:45 quarter boundary)
- No minute-timer needed: quarter-hour updates + push = optimal
- Push-updates: <1sec latency for refreshing/fresh/error states
- Timer offset tracking: Accurate tomorrow predictions

Removed obsolete sensors:
- data_timestamp (replaced by lifecycle attributes)
- price_forecast (never implemented, removed from definitions)

Impact: Users can monitor data freshness, API call patterns, cache age,
and understand integration behavior. Perfect for troubleshooting and
visibility into when data updates occur.
This commit is contained in:
Julian Pawlowski 2025-11-20 15:12:41 +00:00
parent 02935c8d72
commit 189d3ba84d
20 changed files with 584 additions and 202 deletions

View file

@ -84,6 +84,8 @@ TIME_SENSITIVE_ENTITY_KEYS = frozenset(
"best_price_next_start_time",
"peak_price_end_time",
"peak_price_next_start_time",
# Lifecycle sensor (needs quarter-hour updates for turnover_pending detection at 23:45)
"data_lifecycle_status",
}
)

View file

@ -13,6 +13,7 @@ from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
if TYPE_CHECKING:
from collections.abc import Callable
from datetime import date, datetime
from homeassistant.config_entries import ConfigEntry
@ -44,6 +45,9 @@ from .time_service import TibberPricesTimeService
_LOGGER = logging.getLogger(__name__)
# Lifecycle state transition thresholds
FRESH_TO_CACHED_SECONDS = 300 # 5 minutes
# =============================================================================
# TIMER SYSTEM - Three independent update mechanisms:
# =============================================================================
@ -90,14 +94,15 @@ _LOGGER = logging.getLogger(__name__)
# Midnight Turnover Coordination:
# - Both Timer #1 and Timer #2 check for midnight turnover
# - Atomic check: _check_midnight_turnover_needed(now)
# Returns True if current_date > _last_midnight_check.date()
# Returns True if current_date > _last_midnight_turnover_check.date()
# Returns False if already done today
# - Whoever runs first (Timer #1 or Timer #2) performs turnover:
# Calls _perform_midnight_data_rotation(now)
# Updates _last_midnight_check to current time
# Updates _last_midnight_turnover_check and _last_actual_turnover to current time
# - The other timer sees turnover already done and skips
# - No locks needed - date comparison is naturally atomic
# - No race condition possible - Python datetime.date() comparison is thread-safe
# - _last_transformation_time is separate and tracks when data was last transformed (for cache)
#
# =============================================================================
@ -173,7 +178,19 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self._last_price_update: datetime | None = None
self._cached_transformed_data: dict[str, Any] | None = None
self._last_transformation_config: dict[str, Any] | None = None
self._last_midnight_check: datetime | None = None
self._last_transformation_time: datetime | None = None # When data was last transformed (for cache)
self._last_midnight_turnover_check: datetime | None = None # Last midnight turnover detection check
self._last_actual_turnover: datetime | None = None # When midnight turnover actually happened
# Data lifecycle tracking for diagnostic sensor
self._lifecycle_state: str = (
"cached" # Current state: cached, fresh, refreshing, searching_tomorrow, turnover_pending, error
)
self._api_calls_today: int = 0 # Counter for API calls today
self._last_api_call_date: date | None = None # Date of last API call (for daily reset)
self._is_fetching: bool = False # Flag to track active API fetch
self._last_coordinator_update: datetime | None = None # When Timer #1 last ran (_async_update_data)
self._lifecycle_callbacks: list[Callable[[], None]] = [] # Push-update callbacks for lifecycle sensor
# Start timers
self._listener_manager.schedule_quarter_hour_refresh(self._handle_quarter_hour_refresh)
@ -339,10 +356,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
current_date = now.date()
# First time check - initialize (no turnover needed)
if self._last_midnight_check is None:
if self._last_midnight_turnover_check is None:
return False
last_check_date = self._last_midnight_check.date()
last_check_date = self._last_midnight_turnover_check.date()
# Turnover needed if we've crossed into a new day
return current_date > last_check_date
@ -362,7 +379,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""
current_date = now.date()
last_check_date = self._last_midnight_check.date() if self._last_midnight_check else current_date
last_check_date = (
self._last_midnight_turnover_check.date() if self._last_midnight_turnover_check else current_date
)
self._log(
"debug",
@ -391,7 +410,8 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
self.data["timestamp"] = now
# Mark turnover as done for today (atomic update)
self._last_midnight_check = now
self._last_midnight_turnover_check = now
self._last_actual_turnover = now # Record when actual turnover happened
@callback
def _check_and_handle_midnight_turnover(self, now: datetime) -> bool:
@ -424,6 +444,26 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
return True
def register_lifecycle_callback(self, callback: Callable[[], None]) -> None:
"""
Register callback for lifecycle state changes (push updates).
This allows the lifecycle sensor to receive immediate updates when
the coordinator's lifecycle state changes, instead of waiting for
the next polling cycle.
Args:
callback: Function to call when lifecycle state changes (typically async_write_ha_state)
"""
if callback not in self._lifecycle_callbacks:
self._lifecycle_callbacks.append(callback)
def _notify_lifecycle_change(self) -> None:
"""Notify registered callbacks about lifecycle state change (push update)."""
for lifecycle_callback in self._lifecycle_callbacks:
lifecycle_callback()
async def async_shutdown(self) -> None:
"""Shut down the coordinator and clean up timers."""
self._listener_manager.cancel_timers()
@ -448,10 +488,20 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
"""
self._log("debug", "[Timer #1] DataUpdateCoordinator check triggered")
# Track when Timer #1 ran (for next_api_poll calculation)
self._last_coordinator_update = self.time.now()
# Create TimeService with fresh reference time for this update cycle
self.time = TibberPricesTimeService()
current_time = self.time.now()
# Transition lifecycle state from "fresh" to "cached" if enough time passed
# (5 minutes threshold defined in lifecycle calculator)
if self._lifecycle_state == "fresh" and self._last_price_update:
age = current_time - self._last_price_update
if age.total_seconds() > FRESH_TO_CACHED_SECONDS:
self._lifecycle_state = "cached"
# Update helper modules with fresh TimeService instance
self.api.time = self.time
self._data_fetcher.time = self.time
@ -462,9 +512,9 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
if self._cached_price_data is None and self._cached_user_data is None:
await self.load_cache()
# Initialize midnight check on first run
if self._last_midnight_check is None:
self._last_midnight_check = current_time
# Initialize midnight turnover check on first run
if self._last_midnight_turnover_check is None:
self._last_midnight_turnover_check = current_time
# CRITICAL: Check for midnight turnover FIRST (before any data operations)
# This prevents race condition with Timer #2 (quarter-hour refresh)
@ -481,6 +531,17 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
try:
if self.is_main_entry():
# Set lifecycle state to refreshing before API call
self._lifecycle_state = "refreshing"
self._is_fetching = True
self._notify_lifecycle_change() # Push update: now refreshing
# Reset API call counter if day changed
current_date = current_time.date()
if self._last_api_call_date != current_date:
self._api_calls_today = 0
self._last_api_call_date = current_date
# Main entry fetches data for all homes
configured_home_ids = self._get_configured_home_ids()
result = await self._data_fetcher.handle_main_entry_update(
@ -488,11 +549,20 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
configured_home_ids,
self._transform_data_for_main_entry,
)
# Update lifecycle tracking after successful fetch
self._is_fetching = False
self._api_calls_today += 1
self._lifecycle_state = "fresh" # Data just fetched
self._notify_lifecycle_change() # Push update: fresh data available
# CRITICAL: Sync cached_user_data after API call (for new integrations without cache)
# handle_main_entry_update() may have fetched user_data via update_user_data_if_needed()
self._cached_user_data = self._data_fetcher.cached_user_data
# Sync _last_price_update for lifecycle tracking
self._last_price_update = self._data_fetcher._last_price_update # noqa: SLF001 - Sync for lifecycle tracking
return result
# Subentries get data from main coordinator
# Subentries get data from main coordinator (no lifecycle tracking - they don't fetch)
return await self._handle_subentry_update()
except (
@ -500,6 +570,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
TibberPricesApiClientCommunicationError,
TibberPricesApiClientError,
) as err:
# Reset lifecycle state on error
self._is_fetching = False
self._lifecycle_state = "error"
self._notify_lifecycle_change() # Push update: error occurred
return await self._data_fetcher.handle_api_error(
err,
self._transform_data_for_main_entry,
@ -556,6 +630,17 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Sync legacy references
self._cached_price_data = self._data_fetcher.cached_price_data
self._cached_user_data = self._data_fetcher.cached_user_data
self._last_price_update = self._data_fetcher._last_price_update # noqa: SLF001 - Sync for lifecycle tracking
self._last_user_update = self._data_fetcher._last_user_update # noqa: SLF001 - Sync for lifecycle tracking
# Initialize _last_actual_turnover: If cache is from today, assume turnover happened at midnight
if self._last_price_update:
cache_date = self.time.as_local(self._last_price_update).date()
today_date = self.time.as_local(self.time.now()).date()
if cache_date == today_date:
# Cache is from today, so midnight turnover already happened
today_midnight = self.time.as_local(self.time.now()).replace(hour=0, minute=0, second=0, microsecond=0)
self._last_actual_turnover = today_midnight
def _perform_midnight_turnover(self, price_info: dict[str, Any]) -> dict[str, Any]:
"""
@ -578,7 +663,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
async def _store_cache(self) -> None:
"""Store cache data."""
await self._data_fetcher.store_cache(self._last_midnight_check)
await self._data_fetcher.store_cache(self._last_midnight_turnover_check)
def _needs_tomorrow_data(self, tomorrow_date: date) -> bool:
"""Check if tomorrow data is missing or invalid."""
@ -654,13 +739,13 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
now_local = self.time.as_local(current_time)
current_date = now_local.date()
if self._last_midnight_check is None:
if self._last_transformation_time is None:
return True
last_check_local = self.time.as_local(self._last_midnight_check)
last_check_date = last_check_local.date()
last_transform_local = self.time.as_local(self._last_transformation_time)
last_transform_date = last_transform_local.date()
if current_date != last_check_date:
if current_date != last_transform_date:
self._log("debug", "Midnight turnover detected, retransforming data")
return True
@ -687,7 +772,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Cache the transformed data
self._cached_transformed_data = transformed_data
self._last_transformation_config = self._get_current_transformation_config()
self._last_midnight_check = current_time
self._last_transformation_time = current_time
return transformed_data
@ -716,7 +801,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
# Cache the transformed data
self._cached_transformed_data = transformed_data
self._last_transformation_config = self._get_current_transformation_config()
self._last_midnight_check = current_time
self._last_transformation_time = current_time
return transformed_data

View file

@ -298,10 +298,10 @@
"long_description": "Zeigt die Volatilität über heute und morgen zusammen (wenn morgige Daten verfügbar sind). Bietet eine erweiterte Ansicht der Preisvariation über bis zu 48 Stunden. Fällt auf Nur-Heute zurück, wenn morgige Daten noch nicht verfügbar sind.",
"usage_tips": "Verwenden Sie dies für Mehrtagsplanung und um zu verstehen, ob Preismöglichkeiten über die Tagesgrenze hinweg bestehen. Die Attribute 'today_volatility' und 'tomorrow_volatility' zeigen individuelle Tagesbeiträge. Nützlich für die Planung von Ladesitzungen, die Mitternacht überschreiten könnten."
},
"price_forecast": {
"description": "Prognose kommender Strompreise",
"long_description": "Zeigt kommende Strompreise für zukünftige Intervalle in einem Format, das einfach in Dashboards verwendet werden kann",
"usage_tips": "Verwenden Sie die Attribute dieser Entität, um kommende Preise in Diagrammen oder benutzerdefinierten Karten anzuzeigen. Greifen Sie entweder auf 'intervals' für alle zukünftigen Intervalle oder auf 'hours' für stündliche Zusammenfassungen zu."
"data_lifecycle_status": {
"description": "Aktueller Status des Preisdaten-Lebenszyklus und der Zwischenspeicherung",
"long_description": "Zeigt an, ob die Integration zwischengespeicherte Daten oder frische Daten von der API verwendet. Zeigt aktuellen Lebenszyklus-Status: 'cached' (verwendet gespeicherte Daten), 'fresh' (gerade von API abgerufen), 'refreshing' (wird gerade abgerufen), 'searching_tomorrow' (sucht aktiv nach Morgendaten nach 13:00 Uhr), 'turnover_pending' (innerhalb 15 Minuten vor Mitternacht, 23:45-00:00) oder 'error' (Abruf fehlgeschlagen). Enthält umfassende Attribute wie Cache-Alter, nächste API-Abfragezeit, Datenvollständigkeit und API-Aufruf-Statistiken.",
"usage_tips": "Verwende diesen Diagnosesensor, um Datenaktualität und API-Aufrufmuster zu verstehen. Prüfe das 'cache_age'-Attribut, um zu sehen, wie alt die aktuellen Daten sind. Überwache 'next_api_poll', um zu wissen, wann das nächste Update geplant ist. Verwende 'data_completeness', um zu sehen, ob Daten für gestern/heute/morgen verfügbar sind. Der 'api_calls_today'-Zähler hilft, die API-Nutzung zu verfolgen. Perfekt zur Fehlersuche oder zum Verständnis des Integrationsverhaltens."
},
"best_price_end_time": {
"description": "Wann der aktuelle oder nächste günstige Zeitraum endet",

View file

@ -298,10 +298,10 @@
"long_description": "Shows volatility across both today and tomorrow combined (when tomorrow's data is available). Provides an extended view of price variation spanning up to 48 hours. Falls back to today-only when tomorrow's data isn't available yet.",
"usage_tips": "Use this for multi-day planning and to understand if price opportunities exist across the day boundary. The 'today_volatility' and 'tomorrow_volatility' breakdown attributes show individual day contributions. Useful for scheduling charging sessions that might span midnight."
},
"price_forecast": {
"description": "Forecast of upcoming electricity prices",
"long_description": "Shows upcoming electricity prices for future intervals in a format that's easy to use in dashboards",
"usage_tips": "Use this entity's attributes to display upcoming prices in charts or custom cards. Access either 'intervals' for all future intervals or 'hours' for hourly summaries."
"data_lifecycle_status": {
"description": "Current state of price data lifecycle and caching",
"long_description": "Shows whether the integration is using cached data or fresh data from the API. Displays current lifecycle state: 'cached' (using stored data), 'fresh' (just fetched from API), 'refreshing' (currently fetching), 'searching_tomorrow' (actively polling for tomorrow's data after 13:00), 'turnover_pending' (within 15 minutes of midnight, 23:45-00:00), or 'error' (fetch failed). Includes comprehensive attributes like cache age, next API poll time, data completeness, and API call statistics.",
"usage_tips": "Use this diagnostic sensor to understand data freshness and API call patterns. Check 'cache_age' attribute to see how old the current data is. Monitor 'next_api_poll' to know when the next update is scheduled. Use 'data_completeness' to see if yesterday/today/tomorrow data is available. The 'api_calls_today' counter helps track API usage. Perfect for troubleshooting or understanding the integration's behavior."
},
"best_price_end_time": {
"description": "When the current or next best price period ends",

View file

@ -295,13 +295,13 @@
},
"today_tomorrow_volatility": {
"description": "Kombinert prisvolatilitetsklassifisering for i dag og i morgen",
"long_description": "Viser volatilitet på tvers av både i dag og i morgen kombinert (når morgendagens data er tilgjengelig). Gir en utvidet oversikt over prisvariasjoner som spenner over opptil 48 timer. Faller tilbake til kun i dag når morgendagens data ikke er tilgjengelig ennå.",
"usage_tips": "Bruk dette til flerørs planlegging og for å forstå om prismuligheter eksisterer på tvers av daggrensen. Attributtene 'today_volatility' og 'tomorrow_volatility' viser individuelle dagsbidrag. Nyttig for planlegging av ladesesjoner som kan strekke seg over midnatt."
"long_description": "Viser volatilitet på tvers av både i dag og i morgen kombinert (når morgendagens data er tilgjengelig). Gir en utvidet visning av prisvariasjoner som spenner over opptil 48 timer. Faller tilbake til bare i dag når morgendagens data ikke er tilgjengelig ennå.",
"usage_tips": "Bruk dette for flersdagers planlegging og for å forstå om prismuligheter eksisterer på tvers av dags grensen. Attributtene 'today_volatility' og 'tomorrow_volatility' viser individuelle dagbidrag. Nyttig for planlegging av ladeøkter som kan strekke seg over midnatt."
},
"price_forecast": {
"description": "Prognose for kommende elektrisitetspriser",
"long_description": "Viser kommende elektrisitetspriser for fremtidige intervaller i et format som er enkelt å bruke i dashboards",
"usage_tips": "Bruk denne entitetens attributter til å vise kommende priser i diagrammer eller tilpassede kort. Få tilgang til enten 'intervals' for alle fremtidige intervaller eller 'hours' for timesammendrag."
"data_lifecycle_status": {
"description": "Gjeldende tilstand for prisdatalivssyklus og hurtigbufring",
"long_description": "Viser om integrasjonen bruker hurtigbufrede data eller ferske data fra API-et. Viser gjeldende livssyklustilstand: 'cached' (bruker lagrede data), 'fresh' (nettopp hentet fra API), 'refreshing' (henter for øyeblikket), 'searching_tomorrow' (søker aktivt etter morgendagens data etter 13:00), 'turnover_pending' (innen 15 minutter før midnatt, 23:45-00:00), eller 'error' (henting mislyktes). Inkluderer omfattende attributter som cache-alder, neste API-spørring, datafullstendighet og API-anropsstatistikk.",
"usage_tips": "Bruk denne diagnosesensoren for å forstå dataferskhet og API-anropsmønstre. Sjekk 'cache_age'-attributtet for å se hvor gamle de nåværende dataene er. Overvåk 'next_api_poll' for å vite når neste oppdatering er planlagt. Bruk 'data_completeness' for å se om data for i går/i dag/i morgen er tilgjengelig. 'api_calls_today'-telleren hjelper med å spore API-bruk. Perfekt for feilsøking eller forståelse av integrasjonens oppførsel."
},
"best_price_end_time": {
"description": "Når gjeldende eller neste billigperiode slutter",

View file

@ -295,13 +295,13 @@
},
"today_tomorrow_volatility": {
"description": "Gecombineerde prijsvolatiliteitsclassificatie voor vandaag en morgen",
"long_description": "Toont volatiliteit over zowel vandaag als morgen gecombineerd (wanneer de gegevens van morgen beschikbaar zijn). Biedt een uitgebreid beeld van prijsvariatie over maximaal 48 uur. Valt terug op alleen vandaag wanneer de gegevens van morgen nog niet beschikbaar zijn.",
"usage_tips": "Gebruik dit voor meerdaagse planning en om te begrijpen of prijskansen bestaan over de daggrens heen. De 'today_volatility' en 'tomorrow_volatility' breakdown-attributen tonen individuele dagbijdragen. Nuttig voor het plannen van laadsessies die middernacht kunnen overschrijden."
"long_description": "Toont volatiliteit over zowel vandaag als morgen gecombineerd (wanneer de gegevens van morgen beschikbaar zijn). Biedt een uitgebreid overzicht van prijsvariatie over maximaal 48 uur. Valt terug op alleen vandaag wanneer de gegevens van morgen nog niet beschikbaar zijn.",
"usage_tips": "Gebruik dit voor meerdaagse planning en om te begrijpen of prijskansen bestaan over de daggrenzen heen. De attributen 'today_volatility' en 'tomorrow_volatility' tonen individuele dagbijdragen. Handig voor het plannen van laadsessies die middernacht kunnen overschrijden."
},
"price_forecast": {
"description": "Prognose van aanstaande elektriciteitsprijzen",
"long_description": "Toont aanstaande elektriciteitsprijzen voor toekomstige intervallen in een formaat dat gemakkelijk te gebruiken is in dashboards",
"usage_tips": "Gebruik de attributen van deze entiteit om aanstaande prijzen weer te geven in grafieken of aangepaste kaarten. Toegang tot 'intervals' voor alle toekomstige intervallen of 'hours' voor uuroverzichten."
"data_lifecycle_status": {
"description": "Huidige status van prijsgegevenslevenscyclus en caching",
"long_description": "Toont of de integratie gebruikmaakt van gecachte gegevens of verse gegevens van de API. Toont huidige levenscyclusstatus: 'cached' (gebruikt opgeslagen gegevens), 'fresh' (net opgehaald van API), 'refreshing' (momenteel aan het ophalen), 'searching_tomorrow' (actief aan het zoeken naar morgengegevens na 13:00), 'turnover_pending' (binnen 15 minuten voor middernacht, 23:45-00:00), of 'error' (ophalen mislukt). Bevat uitgebreide attributen zoals cache-leeftijd, volgende API-poll-tijd, gegevensvolledigheid en API-aanroepstatistieken.",
"usage_tips": "Gebruik deze diagnostische sensor om gegevensfrisheid en API-aanroeppatronen te begrijpen. Controleer het 'cache_age'-attribuut om te zien hoe oud de huidige gegevens zijn. Monitor 'next_api_poll' om te weten wanneer de volgende update is gepland. Gebruik 'data_completeness' om te zien of gisteren/vandaag/morgen gegevens beschikbaar zijn. De 'api_calls_today'-teller helpt API-gebruik bij te houden. Perfect voor probleemoplossing of begrip van integratiegedrag."
},
"best_price_end_time": {
"description": "Wanneer de huidige of volgende goedkope periode eindigt",

View file

@ -295,13 +295,13 @@
},
"today_tomorrow_volatility": {
"description": "Kombinerad prisvolatilitetsklassificering för idag och imorgon",
"long_description": "Visar volatilitet över både idag och imorgon kombinerat (när morgondagens data är tillgänglig). Ger en utökad vy av prisvariationen som sträcker sig upp till 48 timmar. Faller tillbaka på endast idag när morgondagens data inte är tillgänglig än.",
"usage_tips": "Använd detta för flerdagarsplanering och för att förstå om prismöjligheter finns över daggränsen. 'today_volatility' och 'tomorrow_volatility' uppdelningsattributen visar individuella dagsbidrag. Användbart för planering av laddningssessioner som kan sträcka sig över midnatt."
"long_description": "Visar volatilitet över både idag och imorgon kombinerat (när morgondagens data är tillgänglig). Ger en utökad vy av prisvariation över upp till 48 timmar. Faller tillbaka till endast idag när morgondagens data inte är tillgänglig ännu.",
"usage_tips": "Använd detta för flerdagarsplanering och för att förstå om prismöjligheter existerar över dagsgränsen. Attributen 'today_volatility' och 'tomorrow_volatility' visar individuella dagsbidrag. Användbart för planering av laddningssessioner som kan sträcka sig över midnatt."
},
"price_forecast": {
"description": "Prognos för kommande elpriser",
"long_description": "Visar kommande elpriser för framtida intervaller i ett format som är enkelt att använda i instrumentpaneler",
"usage_tips": "Använd denna enhets attribut för att visa kommande priser i diagram eller anpassade kort. Få åtkomst till antingen 'intervals' för alla framtida intervaller eller 'hours' för timvisa sammanfattningar."
"data_lifecycle_status": {
"description": "Aktuell status för prisdatalivscykel och cachning",
"long_description": "Visar om integrationen använder cachad data eller färsk data från API:et. Visar aktuell livscykelstatus: 'cached' (använder lagrad data), 'fresh' (nyss hämtad från API), 'refreshing' (hämtar för närvarande), 'searching_tomorrow' (söker aktivt efter morgondagens data efter 13:00), 'turnover_pending' (inom 15 minuter före midnatt, 23:45-00:00), eller 'error' (hämtning misslyckades). Inkluderar omfattande attribut som cache-ålder, nästa API-polling, datafullständighet och API-anropsstatistik.",
"usage_tips": "Använd denna diagnostiksensor för att förstå datafärskhet och API-anropsmönster. Kontrollera 'cache_age'-attributet för att se hur gammal den aktuella datan är. Övervaka 'next_api_poll' för att veta när nästa uppdatering är schemalagd. Använd 'data_completeness' för att se om data för igår/idag/imorgon är tillgänglig. Räknaren 'api_calls_today' hjälper till att spåra API-användning. Perfekt för felsökning eller förståelse av integrationens beteende."
},
"best_price_end_time": {
"description": "När nuvarande eller nästa billigperiod slutar",

View file

@ -25,8 +25,9 @@ if TYPE_CHECKING:
# Import from specialized modules
from .daily_stat import add_statistics_attributes
from .future import add_next_avg_attributes, add_price_forecast_attributes, get_future_prices
from .future import add_next_avg_attributes, get_future_prices
from .interval import add_current_interval_price_attributes
from .lifecycle import build_lifecycle_attributes
from .timing import _is_timing_or_volatility_sensor
from .trend import _add_cached_trend_attributes, _add_timing_or_volatility_attributes
from .volatility import add_volatility_type_attributes, get_prices_for_volatility
@ -130,8 +131,12 @@ def build_sensor_attributes(
cached_data=cached_data,
time=time,
)
elif key == "price_forecast":
add_price_forecast_attributes(attributes=attributes, coordinator=coordinator, time=time)
elif key == "data_lifecycle_status":
# Lifecycle sensor uses dedicated builder with calculator
lifecycle_calculator = cached_data.get("lifecycle_calculator")
if lifecycle_calculator:
lifecycle_attrs = build_lifecycle_attributes(coordinator, lifecycle_calculator)
attributes.update(lifecycle_attrs)
elif _is_timing_or_volatility_sensor(key):
_add_timing_or_volatility_attributes(attributes, key, cached_data, native_value, time=time)

View file

@ -2,8 +2,7 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.core import (
@ -66,100 +65,6 @@ def add_next_avg_attributes(
attributes["hours"] = hours
def add_price_forecast_attributes(
attributes: dict,
coordinator: TibberPricesDataUpdateCoordinator,
*,
time: TibberPricesTimeService,
) -> None:
"""
Add forecast attributes for the price forecast sensor.
Args:
attributes: Dictionary to add attributes to
coordinator: The data update coordinator
time: TibberPricesTimeService instance (required)
"""
future_prices = get_future_prices(coordinator, max_intervals=MAX_FORECAST_INTERVALS, time=time)
if not future_prices:
attributes["intervals"] = []
attributes["intervals_by_hour"] = []
attributes["data_available"] = False
return
# Add timestamp attribute (first future interval)
if future_prices:
attributes["timestamp"] = future_prices[0]["interval_start"]
attributes["intervals"] = future_prices
attributes["data_available"] = True
# Group by hour for easier consumption in dashboards
hours: dict[str, Any] = {}
for interval in future_prices:
# interval_start is already a datetime object (from coordinator data)
starts_at = interval["interval_start"]
if not isinstance(starts_at, datetime):
# Fallback: parse if it's still a string (shouldn't happen)
starts_at = datetime.fromisoformat(starts_at)
hour_key = starts_at.strftime("%Y-%m-%d %H")
if hour_key not in hours:
hours[hour_key] = {
"hour": starts_at.hour,
"day": interval["day"],
"date": starts_at.date(),
"intervals": [],
"min_price": None,
"max_price": None,
"avg_price": 0,
"avg_rating": None, # Initialize rating tracking
"ratings_available": False, # Track if any ratings are available
}
# Create interval data with both price and rating info
interval_data = {
"minute": starts_at.minute,
"price": interval["price"],
"price_minor": interval["price_minor"],
"level": interval["level"], # Price level from priceInfo
"time": starts_at.strftime("%H:%M"),
}
# Add rating data if available
if interval["rating"] is not None:
interval_data["rating"] = interval["rating"]
interval_data["rating_level"] = interval["rating_level"]
hours[hour_key]["ratings_available"] = True
hours[hour_key]["intervals"].append(interval_data)
# Track min/max/avg for the hour
price = interval["price"]
if hours[hour_key]["min_price"] is None or price < hours[hour_key]["min_price"]:
hours[hour_key]["min_price"] = price
if hours[hour_key]["max_price"] is None or price > hours[hour_key]["max_price"]:
hours[hour_key]["max_price"] = price
# Calculate averages
for hour_data in hours.values():
prices = [interval["price"] for interval in hour_data["intervals"]]
if prices:
hour_data["avg_price"] = sum(prices) / len(prices)
hour_data["min_price"] = hour_data["min_price"]
hour_data["max_price"] = hour_data["max_price"]
# Calculate average rating if ratings are available
if hour_data["ratings_available"]:
ratings = [interval.get("rating") for interval in hour_data["intervals"] if "rating" in interval]
if ratings:
hour_data["avg_rating"] = sum(ratings) / len(ratings)
# Convert to list sorted by hour
attributes["intervals_by_hour"] = [hour_data for _, hour_data in sorted(hours.items())]
def get_future_prices(
coordinator: TibberPricesDataUpdateCoordinator,
max_intervals: int | None = None,

View file

@ -0,0 +1,93 @@
"""Attribute builders for lifecycle diagnostic sensor."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.core import (
TibberPricesDataUpdateCoordinator,
)
from custom_components.tibber_prices.sensor.calculators.lifecycle import (
TibberPricesLifecycleCalculator,
)
# Constants for cache age formatting
MINUTES_PER_HOUR = 60
MINUTES_PER_DAY = 1440 # 24 * 60
def build_lifecycle_attributes(
coordinator: TibberPricesDataUpdateCoordinator,
lifecycle_calculator: TibberPricesLifecycleCalculator,
) -> dict[str, Any]:
"""
Build attributes for data_lifecycle_status sensor.
Shows comprehensive cache status, data availability, and update timing.
Returns:
Dict with lifecycle attributes
"""
attributes: dict[str, Any] = {}
# Cache Status (formatted for readability)
cache_age = lifecycle_calculator.get_cache_age_minutes()
if cache_age is not None:
# Format cache age with units for better readability
if cache_age < MINUTES_PER_HOUR:
attributes["cache_age"] = f"{cache_age} min"
elif cache_age < MINUTES_PER_DAY: # Less than 24 hours
hours = cache_age // MINUTES_PER_HOUR
minutes = cache_age % MINUTES_PER_HOUR
attributes["cache_age"] = f"{hours}h {minutes}min" if minutes > 0 else f"{hours}h"
else: # 24+ hours
days = cache_age // MINUTES_PER_DAY
hours = (cache_age % MINUTES_PER_DAY) // MINUTES_PER_HOUR
attributes["cache_age"] = f"{days}d {hours}h" if hours > 0 else f"{days}d"
# Keep raw value for automations
attributes["cache_age_minutes"] = cache_age
cache_validity = lifecycle_calculator.get_cache_validity_status()
attributes["cache_validity"] = cache_validity
if coordinator._last_price_update: # noqa: SLF001 - Internal state access for diagnostic display
attributes["last_api_fetch"] = coordinator._last_price_update.isoformat() # noqa: SLF001
attributes["last_cache_update"] = coordinator._last_price_update.isoformat() # noqa: SLF001
# Data Availability & Completeness
data_completeness = lifecycle_calculator.get_data_completeness_status()
attributes["data_completeness"] = data_completeness
attributes["yesterday_available"] = lifecycle_calculator.is_data_available("yesterday")
attributes["today_available"] = lifecycle_calculator.is_data_available("today")
attributes["tomorrow_available"] = lifecycle_calculator.is_data_available("tomorrow")
attributes["tomorrow_expected_after"] = "13:00"
# Next Actions (only show if meaningful)
next_poll = lifecycle_calculator.get_next_api_poll_time()
if next_poll: # None means data is complete, no more polls needed
attributes["next_api_poll"] = next_poll.isoformat()
next_tomorrow_check = lifecycle_calculator.get_next_tomorrow_check_time()
if next_tomorrow_check:
attributes["next_tomorrow_check"] = next_tomorrow_check.isoformat()
next_midnight = lifecycle_calculator.get_next_midnight_turnover_time()
attributes["next_midnight_turnover"] = next_midnight.isoformat()
# Update Statistics
api_calls = lifecycle_calculator.get_api_calls_today()
attributes["updates_today"] = api_calls
if coordinator._last_actual_turnover: # noqa: SLF001 - Internal state access for diagnostic display
attributes["last_turnover"] = coordinator._last_actual_turnover.isoformat() # noqa: SLF001
# Last Error (if any)
if coordinator.last_exception:
attributes["last_error"] = str(coordinator.last_exception)
return attributes

View file

@ -13,6 +13,7 @@ from __future__ import annotations
from .base import TibberPricesBaseCalculator
from .daily_stat import TibberPricesDailyStatCalculator
from .interval import TibberPricesIntervalCalculator
from .lifecycle import TibberPricesLifecycleCalculator
from .metadata import TibberPricesMetadataCalculator
from .rolling_hour import TibberPricesRollingHourCalculator
from .timing import TibberPricesTimingCalculator
@ -24,6 +25,7 @@ __all__ = [
"TibberPricesBaseCalculator",
"TibberPricesDailyStatCalculator",
"TibberPricesIntervalCalculator",
"TibberPricesLifecycleCalculator",
"TibberPricesMetadataCalculator",
"TibberPricesRollingHourCalculator",
"TibberPricesTimingCalculator",

View file

@ -0,0 +1,277 @@
"""Calculator for data lifecycle status tracking."""
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from datetime import datetime
from custom_components.tibber_prices.coordinator.constants import UPDATE_INTERVAL
from .base import TibberPricesBaseCalculator
# Constants for lifecycle state determination
FRESH_DATA_THRESHOLD_MINUTES = 5 # Data is "fresh" within 5 minutes of API fetch
TOMORROW_CHECK_HOUR = 13 # After 13:00, we actively check for tomorrow data
TURNOVER_WARNING_SECONDS = 300 # Warn 5 minutes before midnight
# Constants for 15-minute update boundaries (Timer #1)
QUARTER_HOUR_BOUNDARIES = [0, 15, 30, 45] # Minutes when Timer #1 can trigger
LAST_HOUR_OF_DAY = 23
class TibberPricesLifecycleCalculator(TibberPricesBaseCalculator):
"""Calculate data lifecycle status and metadata."""
def get_lifecycle_state(self) -> str:
"""
Determine current data lifecycle state.
Returns one of:
- "cached": Using cached data (normal operation)
- "fresh": Just fetched from API (within 5 minutes)
- "refreshing": Currently fetching data from API
- "searching_tomorrow": After 13:00, actively looking for tomorrow data
- "turnover_pending": Midnight is approaching (within 5 minutes)
- "error": Last API call failed
"""
coordinator = self.coordinator
current_time = coordinator.time.now()
# Check if actively fetching
if coordinator._is_fetching: # noqa: SLF001 - Internal state access for lifecycle tracking
return "refreshing"
# Check if last update failed
# If coordinator has last_exception set, the last fetch failed
if coordinator.last_exception is not None:
return "error"
# Check if data is fresh (within 5 minutes of last API fetch)
if coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking
age = current_time - coordinator._last_price_update # noqa: SLF001
if age <= timedelta(minutes=FRESH_DATA_THRESHOLD_MINUTES):
return "fresh"
# Check if midnight turnover is pending (within 15 minutes)
midnight = coordinator.time.as_local(current_time).replace(
hour=0, minute=0, second=0, microsecond=0
) + timedelta(days=1)
time_to_midnight = (midnight - coordinator.time.as_local(current_time)).total_seconds()
if 0 < time_to_midnight <= TURNOVER_WARNING_SECONDS: # Within 15 minutes of midnight (23:45-00:00)
return "turnover_pending"
# Check if we're in tomorrow data search mode (after 13:00 and tomorrow missing)
now_local = coordinator.time.as_local(current_time)
if now_local.hour >= TOMORROW_CHECK_HOUR:
_, tomorrow_midnight = coordinator.time.get_day_boundaries("today")
tomorrow_date = tomorrow_midnight.date()
if coordinator._needs_tomorrow_data(tomorrow_date): # noqa: SLF001 - Internal state access
return "searching_tomorrow"
# Default: using cached data
return "cached"
def get_cache_age_minutes(self) -> int | None:
"""Calculate how many minutes old the cached data is."""
coordinator = self.coordinator
if not coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking
return None
age = coordinator.time.now() - coordinator._last_price_update # noqa: SLF001
return int(age.total_seconds() / 60)
def get_next_api_poll_time(self) -> datetime | None:
"""
Calculate when the next API poll attempt will occur.
Timer #1 runs every 15 minutes FROM INTEGRATION START, not at fixed boundaries.
For example, if integration started at 13:07, timer runs at 13:07, 13:22, 13:37, 13:52.
Returns:
Next poll time when tomorrow data will be fetched (predictive).
Logic:
- If before 13:00 today: Show today 13:00 (when tomorrow-search begins)
- If after 13:00 today AND tomorrow data missing: Show next Timer #1 execution (intensive polling)
- If after 13:00 today AND tomorrow data present: Show tomorrow 13:00 (predictive!)
"""
coordinator = self.coordinator
current_time = coordinator.time.now()
now_local = coordinator.time.as_local(current_time)
# Check if tomorrow data is missing
_, tomorrow_midnight = coordinator.time.get_day_boundaries("today")
tomorrow_date = tomorrow_midnight.date()
tomorrow_missing = coordinator._needs_tomorrow_data(tomorrow_date) # noqa: SLF001
# Case 1: Before 13:00 today - next poll is today at 13:00 (when tomorrow-search begins)
if now_local.hour < TOMORROW_CHECK_HOUR:
return now_local.replace(hour=TOMORROW_CHECK_HOUR, minute=0, second=0, microsecond=0)
# Case 2: After 13:00 today AND tomorrow data missing - actively polling now
if tomorrow_missing:
# Calculate next Timer #1 execution based on last coordinator update
if coordinator._last_coordinator_update is not None: # noqa: SLF001
next_timer = coordinator._last_coordinator_update + UPDATE_INTERVAL # noqa: SLF001
return coordinator.time.as_local(next_timer)
# Fallback: If we don't know when last update was, estimate from now
# (Should rarely happen - only on first startup before first Timer #1 run)
return now_local + UPDATE_INTERVAL
# Case 3: After 13:00 today AND tomorrow data present - PREDICTIVE: next fetch is tomorrow 13:xx
# After midnight turnover, tomorrow becomes today, and we'll need NEW tomorrow data
# Calculate tomorrow's first Timer #1 execution after 13:00 based on current timer offset
tomorrow_midnight = now_local.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
tomorrow_13 = tomorrow_midnight.replace(hour=TOMORROW_CHECK_HOUR, minute=0, second=0, microsecond=0)
# If we know the last coordinator update, calculate the timer offset
if coordinator._last_coordinator_update is not None: # noqa: SLF001
last_update_local = coordinator.time.as_local(coordinator._last_coordinator_update) # noqa: SLF001
# Calculate offset: minutes + seconds past the quarter-hour boundary
# Example: Timer runs at 13:04:37 → offset is 4 minutes 37 seconds from 13:00:00
minutes_past_quarter = last_update_local.minute % 15
seconds_offset = last_update_local.second
# Find first Timer #1 execution at or after 13:00:00 tomorrow
# Start at 13:00:00 and add offset
candidate_time = tomorrow_13.replace(minute=minutes_past_quarter, second=seconds_offset, microsecond=0)
# If this is before 13:00, add 15 minutes (first timer after 13:00)
# Example: If offset is :59:30, candidate would be 12:59:30, so we add 15min → 13:14:30
if candidate_time < tomorrow_13:
candidate_time += UPDATE_INTERVAL
return candidate_time
# Fallback: If we don't know timer offset yet, assume 13:00:00
return tomorrow_13
def get_next_tomorrow_check_time(self) -> datetime | None:
"""
Calculate when the next tomorrow data check will occur.
Returns None if not applicable (before 13:00 or tomorrow already available).
"""
coordinator = self.coordinator
current_time = coordinator.time.now()
now_local = coordinator.time.as_local(current_time)
# Only relevant after 13:00
if now_local.hour < TOMORROW_CHECK_HOUR:
return None
# Only relevant if tomorrow data is missing
_, tomorrow_midnight = coordinator.time.get_day_boundaries("today")
tomorrow_date = tomorrow_midnight.date()
if not coordinator._needs_tomorrow_data(tomorrow_date): # noqa: SLF001 - Internal state access
return None
# Next check = next regular API poll (same as get_next_api_poll_time)
return self.get_next_api_poll_time()
def get_next_midnight_turnover_time(self) -> datetime:
"""Calculate when the next midnight turnover will occur."""
coordinator = self.coordinator
current_time = coordinator.time.now()
now_local = coordinator.time.as_local(current_time)
# Next midnight
return now_local.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
def is_data_available(self, day: str) -> bool:
"""
Check if data is available for a specific day.
Args:
day: "yesterday", "today", or "tomorrow"
Returns:
True if data exists and is not empty
"""
coordinator = self.coordinator
if not coordinator.data:
return False
price_info = coordinator.data.get("priceInfo", {})
day_data = price_info.get(day, [])
return bool(day_data)
def get_data_completeness_status(self) -> str:
"""
Get human-readable data completeness status.
Returns:
'complete': All data (yesterday/today/tomorrow) available
'missing_tomorrow': Only yesterday and today available
'missing_yesterday': Only today and tomorrow available
'partial': Only today or some other partial combination
'no_data': No data available at all
"""
yesterday_available = self.is_data_available("yesterday")
today_available = self.is_data_available("today")
tomorrow_available = self.is_data_available("tomorrow")
if yesterday_available and today_available and tomorrow_available:
return "complete"
if yesterday_available and today_available and not tomorrow_available:
return "missing_tomorrow"
if not yesterday_available and today_available and tomorrow_available:
return "missing_yesterday"
if today_available:
return "partial"
return "no_data"
def get_cache_validity_status(self) -> str:
"""
Get cache validity status.
Returns:
"valid": Cache is current and matches today's date
"stale": Cache exists but is outdated
"date_mismatch": Cache is from a different day
"empty": No cache data
"""
coordinator = self.coordinator
# Check if coordinator has data (transformed, ready for entities)
if not coordinator.data:
return "empty"
# Check if we have price update timestamp
if not coordinator._last_price_update: # noqa: SLF001 - Internal state access for lifecycle tracking
return "empty"
current_time = coordinator.time.now()
current_local_date = coordinator.time.as_local(current_time).date()
last_update_local_date = coordinator.time.as_local(coordinator._last_price_update).date() # noqa: SLF001
if current_local_date != last_update_local_date:
return "date_mismatch"
# Check if cache is stale (older than expected)
age = current_time - coordinator._last_price_update # noqa: SLF001
# Consider stale if older than 2 hours (8 * 15-minute intervals)
if age > timedelta(hours=2):
return "stale"
return "valid"
def get_api_calls_today(self) -> int:
"""Get the number of API calls made today."""
coordinator = self.coordinator
# Reset counter if day changed
current_date = coordinator.time.now().date()
if coordinator._last_api_call_date != current_date: # noqa: SLF001 - Internal state access
return 0
return coordinator._api_calls_today # noqa: SLF001

View file

@ -49,12 +49,12 @@ from .attributes import (
add_volatility_type_attributes,
build_extra_state_attributes,
build_sensor_attributes,
get_future_prices,
get_prices_for_volatility,
)
from .calculators import (
TibberPricesDailyStatCalculator,
TibberPricesIntervalCalculator,
TibberPricesLifecycleCalculator,
TibberPricesMetadataCalculator,
TibberPricesRollingHourCalculator,
TibberPricesTimingCalculator,
@ -106,6 +106,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
self._interval_calculator = TibberPricesIntervalCalculator(coordinator)
self._timing_calculator = TibberPricesTimingCalculator(coordinator)
self._trend_calculator = TibberPricesTrendCalculator(coordinator)
self._lifecycle_calculator = TibberPricesLifecycleCalculator(coordinator)
self._value_getter: Callable | None = self._get_value_getter()
self._time_sensitive_remove_listener: Callable | None = None
self._minute_update_remove_listener: Callable | None = None
@ -114,6 +115,10 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
self._chart_data_error = None # Track last service call error
self._chart_data_response = None # Store service response for attributes
# Register for push updates if this is the lifecycle sensor
if entity_description.key == "data_lifecycle_status":
coordinator.register_lifecycle_callback(self.async_write_ha_state)
async def async_added_to_hass(self) -> None:
"""When entity is added to hass."""
await super().async_added_to_hass()
@ -208,8 +213,8 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
timing_calculator=self._timing_calculator,
volatility_calculator=self._volatility_calculator,
metadata_calculator=self._metadata_calculator,
lifecycle_calculator=self._lifecycle_calculator,
get_next_avg_n_hours_value=self._get_next_avg_n_hours_value,
get_price_forecast_value=self._get_price_forecast_value,
get_data_timestamp=self._get_data_timestamp,
get_chart_data_export_value=self._get_chart_data_export_value,
)
@ -591,18 +596,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
# BEST/PEAK PRICE TIMING METHODS (period-based time tracking)
# ========================================================================
# Add method to get future price intervals
def _get_price_forecast_value(self) -> str | None:
"""Get the highest or lowest price status for the price forecast entity."""
future_prices = get_future_prices(
self.coordinator, max_intervals=MAX_FORECAST_INTERVALS, time=self.coordinator.time
)
if not future_prices:
return "No forecast data available"
# Return a simple status message indicating how much forecast data is available
return f"Forecast available for {len(future_prices)} intervals"
def _get_home_metadata_value(self, field: str) -> str | int | None:
"""
Get home metadata value from user data.
@ -875,6 +868,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
"last_rating_level": self._interval_calculator.get_last_rating_level(),
"data_timestamp": getattr(self, "_data_timestamp", None),
"rolling_hour_level": self._get_rolling_hour_level_for_cached_data(key),
"lifecycle_calculator": self._lifecycle_calculator, # For lifecycle sensor attributes
}
# Use the centralized attribute builder

View file

@ -835,20 +835,13 @@ PEAK_PRICE_TIMING_SENSORS = (
DIAGNOSTIC_SENSORS = (
SensorEntityDescription(
key="data_timestamp",
translation_key="data_timestamp",
name="Data Expiration",
icon="mdi:clock-check",
device_class=SensorDeviceClass.TIMESTAMP,
state_class=None, # Timestamps: no statistics
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="price_forecast",
translation_key="price_forecast",
name="Price Forecast",
icon="mdi:chart-line",
state_class=None, # Text/status value: no statistics
key="data_lifecycle_status",
translation_key="data_lifecycle_status",
name="Data Lifecycle Status",
icon="mdi:database-sync",
device_class=SensorDeviceClass.ENUM,
options=["cached", "fresh", "refreshing", "searching_tomorrow", "turnover_pending", "error"],
state_class=None, # Status value: no statistics
entity_category=EntityCategory.DIAGNOSTIC,
),
# Home metadata from user data

View file

@ -19,6 +19,7 @@ if TYPE_CHECKING:
from custom_components.tibber_prices.sensor.calculators.daily_stat import TibberPricesDailyStatCalculator
from custom_components.tibber_prices.sensor.calculators.interval import TibberPricesIntervalCalculator
from custom_components.tibber_prices.sensor.calculators.lifecycle import TibberPricesLifecycleCalculator
from custom_components.tibber_prices.sensor.calculators.metadata import TibberPricesMetadataCalculator
from custom_components.tibber_prices.sensor.calculators.rolling_hour import TibberPricesRollingHourCalculator
from custom_components.tibber_prices.sensor.calculators.timing import TibberPricesTimingCalculator
@ -36,8 +37,8 @@ def get_value_getter_mapping( # noqa: PLR0913 - needs all calculators as parame
timing_calculator: TibberPricesTimingCalculator,
volatility_calculator: TibberPricesVolatilityCalculator,
metadata_calculator: TibberPricesMetadataCalculator,
lifecycle_calculator: TibberPricesLifecycleCalculator,
get_next_avg_n_hours_value: Callable[[int], float | None],
get_price_forecast_value: Callable[[], str | None],
get_data_timestamp: Callable[[], datetime | None],
get_chart_data_export_value: Callable[[], str | None],
) -> dict[str, Callable]:
@ -56,8 +57,8 @@ def get_value_getter_mapping( # noqa: PLR0913 - needs all calculators as parame
timing_calculator: Calculator for best/peak price period timing
volatility_calculator: Calculator for price volatility analysis
metadata_calculator: Calculator for home/metering metadata
lifecycle_calculator: Calculator for data lifecycle tracking
get_next_avg_n_hours_value: Method for next N-hour average forecasts
get_price_forecast_value: Method for price forecast sensor
get_data_timestamp: Method for data timestamp sensor
get_chart_data_export_value: Method for chart data export sensor
@ -203,8 +204,8 @@ def get_value_getter_mapping( # noqa: PLR0913 - needs all calculators as parame
"price_trend_12h": lambda: trend_calculator.get_price_trend_value(hours=12),
# Diagnostic sensors
"data_timestamp": get_data_timestamp,
# Price forecast sensor
"price_forecast": get_price_forecast_value,
# Data lifecycle status sensor
"data_lifecycle_status": lambda: lifecycle_calculator.get_lifecycle_state(),
# Home metadata sensors (via MetadataCalculator)
"home_type": lambda: metadata_calculator.get_home_metadata_value("type"),
"home_size": lambda: metadata_calculator.get_home_metadata_value("size"),

View file

@ -512,8 +512,16 @@
"monthly_rating": {
"name": "Monatliche Preisbewertung"
},
"data_timestamp": {
"name": "Preisdaten-Ablauf"
"data_lifecycle_status": {
"name": "Datenlebenszyklus-Status",
"state": {
"cached": "Zwischengespeichert",
"fresh": "Frisch",
"refreshing": "Aktualisiere",
"searching_tomorrow": "Suche Morgendaten",
"turnover_pending": "Mitternachtswechsel steht bevor",
"error": "Fehler"
}
},
"today_volatility": {
"name": "Volatilität heute",
@ -587,9 +595,6 @@
"peak_price_next_in_minutes": {
"name": "Spitzenpreis startet in"
},
"price_forecast": {
"name": "Preisprognose"
},
"home_type": {
"name": "Wohnungstyp",
"state": {

View file

@ -508,8 +508,16 @@
"monthly_rating": {
"name": "Monthly Price Rating"
},
"data_timestamp": {
"name": "Price Data Expiration"
"data_lifecycle_status": {
"name": "Data Lifecycle Status",
"state": {
"cached": "Cached",
"fresh": "Fresh",
"refreshing": "Refreshing",
"searching_tomorrow": "Searching Tomorrow",
"turnover_pending": "Turnover Pending",
"error": "Error"
}
},
"today_volatility": {
"name": "Today's Price Volatility",
@ -583,9 +591,6 @@
"peak_price_next_in_minutes": {
"name": "Peak Price Starts In"
},
"price_forecast": {
"name": "Price Forecast"
},
"home_type": {
"name": "Home Type",
"state": {

View file

@ -508,8 +508,16 @@
"monthly_rating": {
"name": "Månedlig prisvurdering"
},
"data_timestamp": {
"name": "Prisdata Utløp"
"data_lifecycle_status": {
"name": "Datalivssyklus-status",
"state": {
"cached": "Hurtigbufret",
"fresh": "Fersk",
"refreshing": "Oppdaterer",
"searching_tomorrow": "Søker morgendagens data",
"turnover_pending": "Midnattskifte venter",
"error": "Feil"
}
},
"today_volatility": {
"name": "Volatilitet i dag",
@ -583,9 +591,6 @@
"peak_price_next_in_minutes": {
"name": "Topppris starter om"
},
"price_forecast": {
"name": "Prisprognose"
},
"home_type": {
"name": "Boligtype",
"state": {

View file

@ -508,9 +508,6 @@
"monthly_rating": {
"name": "Maandelijkse prijsbeoordeling"
},
"data_timestamp": {
"name": "Prijsgegevens Vervaldatum"
},
"today_volatility": {
"name": "Volatiliteit vandaag",
"state": {
@ -583,9 +580,6 @@
"peak_price_next_in_minutes": {
"name": "Piekprijs start over"
},
"price_forecast": {
"name": "Prijsprognose"
},
"home_type": {
"name": "Woningtype",
"state": {
@ -664,6 +658,17 @@
"ready": "Klaar",
"error": "Fout"
}
},
"data_lifecycle_status": {
"name": "Datalevenscyclus-status",
"state": {
"cached": "In cache",
"fresh": "Vers",
"refreshing": "Vernieuwen",
"searching_tomorrow": "Zoekt morgengegevens",
"turnover_pending": "Middernachtwissel in behandeling",
"error": "Fout"
}
}
},
"binary_sensor": {
@ -896,4 +901,4 @@
}
},
"title": "Tibber Prijsinformatie & Beoordelingen"
}
}

View file

@ -508,8 +508,16 @@
"monthly_rating": {
"name": "Månatlig prisvärdering"
},
"data_timestamp": {
"name": "Prisdata Utgångsdatum"
"data_lifecycle_status": {
"name": "Datalivscykel-status",
"state": {
"cached": "Cachad",
"fresh": "Färsk",
"refreshing": "Uppdaterar",
"searching_tomorrow": "Söker morgondagens data",
"turnover_pending": "Midnattskifte väntar",
"error": "Fel"
}
},
"today_volatility": {
"name": "Volatilitet idag",
@ -583,9 +591,6 @@
"peak_price_next_in_minutes": {
"name": "Topppris startar om"
},
"price_forecast": {
"name": "Prisprognos"
},
"home_type": {
"name": "Bostadstyp",
"state": {