hass.tibber_prices/custom_components/tibber_prices/sensor/attributes/future.py
Julian Pawlowski 189d3ba84d feat(sensor): add data lifecycle diagnostic sensor with push updates
Add comprehensive data_lifecycle_status sensor showing real-time cache
vs fresh API data status with 6 states and 13+ detailed attributes.

Key features:
- 6 lifecycle states: cached, fresh, refreshing, searching_tomorrow,
  turnover_pending, error
- Push-update system for instant state changes (refreshing→fresh→error)
- Quarter-hour polling for turnover_pending detection at 23:45
- Accurate next_api_poll prediction using Timer #1 offset tracking
- Tomorrow prediction with actual timer schedule (not fixed 13:00)
- 13+ formatted attributes: cache_age, data_completeness, api_calls_today,
  next_api_poll, etc.

Implementation:
- sensor/calculators/lifecycle.py: New calculator with state logic
- sensor/attributes/lifecycle.py: Attribute builders with formatting
- coordinator/core.py: Lifecycle tracking + callback system (+16 lines)
- sensor/core.py: Push callback registration (+3 lines)
- coordinator/constants.py: Added to TIME_SENSITIVE_ENTITY_KEYS
- Translations: All 5 languages (de, en, nb, nl, sv)

Timing optimization:
- Extended turnover warning: 5min → 15min (catches 23:45 quarter boundary)
- No minute-timer needed: quarter-hour updates + push = optimal
- Push-updates: <1sec latency for refreshing/fresh/error states
- Timer offset tracking: Accurate tomorrow predictions

Removed obsolete sensors:
- data_timestamp (replaced by lifecycle attributes)
- price_forecast (never implemented, removed from definitions)

Impact: Users can monitor data freshness, API call patterns, cache age,
and understand integration behavior. Perfect for troubleshooting and
visibility into when data updates occur.
2025-11-20 15:12:41 +00:00

131 lines
4.3 KiB
Python

"""Future price/trend attribute builders for Tibber Prices sensors."""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.core import (
TibberPricesDataUpdateCoordinator,
)
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
# Constants
MAX_FORECAST_INTERVALS = 8 # Show up to 8 future intervals (2 hours with 15-min intervals)
def add_next_avg_attributes(
attributes: dict,
key: str,
coordinator: TibberPricesDataUpdateCoordinator,
*,
time: TibberPricesTimeService,
) -> None:
"""
Add attributes for next N hours average price sensors.
Args:
attributes: Dictionary to add attributes to
key: The sensor entity key
coordinator: The data update coordinator
time: TibberPricesTimeService instance (required)
"""
# Extract hours from sensor key (e.g., "next_avg_3h" -> 3)
try:
hours = int(key.split("_")[-1].replace("h", ""))
except (ValueError, AttributeError):
return
# Use TimeService to get the N-hour window starting from next interval
next_interval_start, window_end = time.get_next_n_hours_window(hours)
# Get all price intervals
price_info = coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = today_prices + tomorrow_prices
if not all_prices:
return
# Find all intervals in the window
intervals_in_window = []
for price_data in all_prices:
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
if next_interval_start <= starts_at < window_end:
intervals_in_window.append(price_data)
# Add timestamp attribute (start of next interval - where calculation begins)
if intervals_in_window:
attributes["timestamp"] = intervals_in_window[0].get("startsAt")
attributes["interval_count"] = len(intervals_in_window)
attributes["hours"] = hours
def get_future_prices(
coordinator: TibberPricesDataUpdateCoordinator,
max_intervals: int | None = None,
*,
time: TibberPricesTimeService,
) -> list[dict] | None:
"""
Get future price data for multiple upcoming intervals.
Args:
coordinator: The data update coordinator
max_intervals: Maximum number of future intervals to return
time: TibberPricesTimeService instance (required)
Returns:
List of upcoming price intervals with timestamps and prices
"""
if not coordinator.data:
return None
price_info = coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = today_prices + tomorrow_prices
if not all_prices:
return None
# Initialize the result list
future_prices = []
# Track the maximum intervals to return
intervals_to_return = MAX_FORECAST_INTERVALS if max_intervals is None else max_intervals
for day_key in ["today", "tomorrow"]:
for price_data in price_info.get(day_key, []):
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
interval_end = starts_at + time.get_interval_duration()
# Use TimeService to check if interval is in future
if time.is_in_future(starts_at):
future_prices.append(
{
"interval_start": starts_at,
"interval_end": interval_end,
"price": float(price_data["total"]),
"price_minor": round(float(price_data["total"]) * 100, 2),
"level": price_data.get("level", "NORMAL"),
"rating": price_data.get("difference", None),
"rating_level": price_data.get("rating_level"),
"day": day_key,
}
)
# Sort by start time
future_prices.sort(key=lambda x: x["interval_start"])
# Limit to the requested number of intervals
return future_prices[:intervals_to_return] if future_prices else None