hass.tibber_prices/custom_components/tibber_prices/sensor/attributes/volatility.py
Julian Pawlowski 625bc222ca refactor(coordinator): centralize time operations through TimeService
Introduce TimeService as single source of truth for all datetime operations,
replacing direct dt_util calls throughout the codebase. This establishes
consistent time context across update cycles and enables future time-travel
testing capability.

Core changes:
- NEW: coordinator/time_service.py with timezone-aware datetime API
- Coordinator now creates TimeService per update cycle, passes to calculators
- Timer callbacks (#2, #3) inject TimeService into entity update flow
- All sensor calculators receive TimeService via coordinator reference
- Attribute builders accept time parameter for timestamp calculations

Key patterns replaced:
- dt_util.now() → time.now() (single reference time per cycle)
- dt_util.parse_datetime() + as_local() → time.get_interval_time()
- Manual interval arithmetic → time.get_interval_offset_time()
- Manual day boundaries → time.get_day_boundaries()
- round_to_nearest_quarter_hour() → time.round_to_nearest_quarter()

Import cleanup:
- Removed dt_util imports from ~30 files (calculators, attributes, utils)
- Restricted dt_util to 3 modules: time_service.py (operations), api/client.py
  (rate limiting), entity_utils/icons.py (cosmetic updates)
- datetime/timedelta only for TYPE_CHECKING (type hints) or duration arithmetic

Interval resolution abstraction:
- Removed hardcoded MINUTES_PER_INTERVAL constant from 10+ files
- New methods: time.minutes_to_intervals(), time.get_interval_duration()
- Supports future 60-minute resolution (legacy data) via TimeService config

Timezone correctness:
- API timestamps (startsAt) already localized by data transformation
- TimeService operations preserve HA user timezone throughout
- DST transitions handled via get_expected_intervals_for_day() (future use)

Timestamp ordering preserved:
- Attribute builders generate default timestamp (rounded quarter)
- Sensors override when needed (next interval, daily midnight, etc.)
- Platform ensures timestamp stays FIRST in attribute dict

Timer integration:
- Timer #2 (quarter-hour): Creates TimeService, calls _handle_time_sensitive_update(time)
- Timer #3 (30-second): Creates TimeService, calls _handle_minute_update(time)
- Consistent time reference for all entities in same update batch

Time-travel readiness:
- TimeService.with_reference_time() enables time injection (not yet used)
- All calculations use time.now() → easy to simulate past/future states
- Foundation for debugging period calculations with historical data

Impact: Eliminates timestamp drift within update cycles (previously 60+ independent
dt_util.now() calls could differ by milliseconds). Establishes architecture for
time-based testing and debugging features.
2025-11-19 18:36:12 +00:00

139 lines
4.9 KiB
Python

"""Volatility attribute builders for Tibber Prices sensors."""
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING
from custom_components.tibber_prices.utils.price import calculate_volatility_level
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import TimeService
def add_volatility_attributes(
attributes: dict,
cached_data: dict,
*,
time: TimeService, # noqa: ARG001
) -> None:
"""
Add attributes for volatility sensors.
Args:
attributes: Dictionary to add attributes to
cached_data: Dictionary containing cached sensor data
time: TimeService instance (required)
"""
if cached_data.get("volatility_attributes"):
attributes.update(cached_data["volatility_attributes"])
def get_prices_for_volatility(
volatility_type: str,
price_info: dict,
*,
time: TimeService,
) -> list[float]:
"""
Get price list for volatility calculation based on type.
Args:
volatility_type: One of "today", "tomorrow", "next_24h", "today_tomorrow"
price_info: Price information dictionary from coordinator data
time: TimeService instance (required)
Returns:
List of prices to analyze
"""
if volatility_type == "today":
return [float(p["total"]) for p in price_info.get("today", []) if "total" in p]
if volatility_type == "tomorrow":
return [float(p["total"]) for p in price_info.get("tomorrow", []) if "total" in p]
if volatility_type == "next_24h":
# Rolling 24h from now
now = time.now()
end_time = now + timedelta(hours=24)
prices = []
for day_key in ["today", "tomorrow"]:
for price_data in price_info.get(day_key, []):
starts_at = price_data.get("startsAt") # Already datetime in local timezone
if starts_at is None:
continue
if time.is_in_future(starts_at) and starts_at < end_time and "total" in price_data:
prices.append(float(price_data["total"]))
return prices
if volatility_type == "today_tomorrow":
# Combined today + tomorrow
prices = []
for day_key in ["today", "tomorrow"]:
for price_data in price_info.get(day_key, []):
if "total" in price_data:
prices.append(float(price_data["total"]))
return prices
return []
def add_volatility_type_attributes(
volatility_attributes: dict,
volatility_type: str,
price_info: dict,
thresholds: dict,
*,
time: TimeService,
) -> None:
"""
Add type-specific attributes for volatility sensors.
Args:
volatility_attributes: Dictionary to add type-specific attributes to
volatility_type: Type of volatility calculation
price_info: Price information dictionary from coordinator data
thresholds: Volatility thresholds configuration
time: TimeService instance (required)
"""
# Add timestamp for calendar day volatility sensors (midnight of the day)
if volatility_type == "today":
today_data = price_info.get("today", [])
if today_data:
volatility_attributes["timestamp"] = today_data[0].get("startsAt")
elif volatility_type == "tomorrow":
tomorrow_data = price_info.get("tomorrow", [])
if tomorrow_data:
volatility_attributes["timestamp"] = tomorrow_data[0].get("startsAt")
elif volatility_type == "today_tomorrow":
# For combined today+tomorrow, use today's midnight
today_data = price_info.get("today", [])
if today_data:
volatility_attributes["timestamp"] = today_data[0].get("startsAt")
# Add breakdown for today vs tomorrow
today_prices = [float(p["total"]) for p in price_info.get("today", []) if "total" in p]
tomorrow_prices = [float(p["total"]) for p in price_info.get("tomorrow", []) if "total" in p]
if today_prices:
today_vol = calculate_volatility_level(today_prices, **thresholds)
today_spread = (max(today_prices) - min(today_prices)) * 100
volatility_attributes["today_spread"] = round(today_spread, 2)
volatility_attributes["today_volatility"] = today_vol
volatility_attributes["interval_count_today"] = len(today_prices)
if tomorrow_prices:
tomorrow_vol = calculate_volatility_level(tomorrow_prices, **thresholds)
tomorrow_spread = (max(tomorrow_prices) - min(tomorrow_prices)) * 100
volatility_attributes["tomorrow_spread"] = round(tomorrow_spread, 2)
volatility_attributes["tomorrow_volatility"] = tomorrow_vol
volatility_attributes["interval_count_tomorrow"] = len(tomorrow_prices)
elif volatility_type == "next_24h":
# Add time window info
now = time.now()
volatility_attributes["timestamp"] = now