hass.tibber_prices/custom_components/tibber_prices/sensor/helpers.py
Julian Pawlowski 625bc222ca refactor(coordinator): centralize time operations through TimeService
Introduce TimeService as single source of truth for all datetime operations,
replacing direct dt_util calls throughout the codebase. This establishes
consistent time context across update cycles and enables future time-travel
testing capability.

Core changes:
- NEW: coordinator/time_service.py with timezone-aware datetime API
- Coordinator now creates TimeService per update cycle, passes to calculators
- Timer callbacks (#2, #3) inject TimeService into entity update flow
- All sensor calculators receive TimeService via coordinator reference
- Attribute builders accept time parameter for timestamp calculations

Key patterns replaced:
- dt_util.now() → time.now() (single reference time per cycle)
- dt_util.parse_datetime() + as_local() → time.get_interval_time()
- Manual interval arithmetic → time.get_interval_offset_time()
- Manual day boundaries → time.get_day_boundaries()
- round_to_nearest_quarter_hour() → time.round_to_nearest_quarter()

Import cleanup:
- Removed dt_util imports from ~30 files (calculators, attributes, utils)
- Restricted dt_util to 3 modules: time_service.py (operations), api/client.py
  (rate limiting), entity_utils/icons.py (cosmetic updates)
- datetime/timedelta only for TYPE_CHECKING (type hints) or duration arithmetic

Interval resolution abstraction:
- Removed hardcoded MINUTES_PER_INTERVAL constant from 10+ files
- New methods: time.minutes_to_intervals(), time.get_interval_duration()
- Supports future 60-minute resolution (legacy data) via TimeService config

Timezone correctness:
- API timestamps (startsAt) already localized by data transformation
- TimeService operations preserve HA user timezone throughout
- DST transitions handled via get_expected_intervals_for_day() (future use)

Timestamp ordering preserved:
- Attribute builders generate default timestamp (rounded quarter)
- Sensors override when needed (next interval, daily midnight, etc.)
- Platform ensures timestamp stays FIRST in attribute dict

Timer integration:
- Timer #2 (quarter-hour): Creates TimeService, calls _handle_time_sensitive_update(time)
- Timer #3 (30-second): Creates TimeService, calls _handle_minute_update(time)
- Consistent time reference for all entities in same update batch

Time-travel readiness:
- TimeService.with_reference_time() enables time injection (not yet used)
- All calculations use time.now() → easy to simulate past/future states
- Foundation for debugging period calculations with historical data

Impact: Eliminates timestamp drift within update cycles (previously 60+ independent
dt_util.now() calls could differ by milliseconds). Establishes architecture for
time-based testing and debugging features.
2025-11-19 18:36:12 +00:00

190 lines
6.2 KiB
Python

"""
Sensor platform-specific helper functions.
This module contains helper functions specific to the sensor platform:
- aggregate_price_data: Calculate average price from window data
- aggregate_level_data: Aggregate price levels from intervals
- aggregate_rating_data: Aggregate price ratings from intervals
- aggregate_window_data: Unified aggregation based on value type
- get_hourly_price_value: Get price for specific hour with offset
For shared helper functions (used by both sensor and binary_sensor platforms),
see entity_utils/helpers.py:
- get_price_value: Price unit conversion
- translate_level: Price level translation
- translate_rating_level: Rating level translation
- find_rolling_hour_center_index: Rolling hour window calculations
"""
from __future__ import annotations
from datetime import timedelta
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import TimeService
from custom_components.tibber_prices.entity_utils.helpers import get_price_value
from custom_components.tibber_prices.utils.price import (
aggregate_price_levels,
aggregate_price_rating,
)
if TYPE_CHECKING:
from collections.abc import Callable
def aggregate_price_data(window_data: list[dict]) -> float | None:
"""
Calculate average price from window data.
Args:
window_data: List of price interval dictionaries with 'total' key
Returns:
Average price in minor currency units (cents/øre), or None if no prices
"""
prices = [float(i["total"]) for i in window_data if "total" in i]
if not prices:
return None
# Return in minor currency units (cents/øre)
return round((sum(prices) / len(prices)) * 100, 2)
def aggregate_level_data(window_data: list[dict]) -> str | None:
"""
Aggregate price levels from window data.
Args:
window_data: List of price interval dictionaries with 'level' key
Returns:
Aggregated price level (lowercase), or None if no levels
"""
levels = [i["level"] for i in window_data if "level" in i]
if not levels:
return None
aggregated = aggregate_price_levels(levels)
return aggregated.lower() if aggregated else None
def aggregate_rating_data(
window_data: list[dict],
threshold_low: float,
threshold_high: float,
) -> str | None:
"""
Aggregate price ratings from window data.
Args:
window_data: List of price interval dictionaries with 'difference' and 'rating_level'
threshold_low: Low threshold for rating calculation
threshold_high: High threshold for rating calculation
Returns:
Aggregated price rating (lowercase), or None if no ratings
"""
differences = [i["difference"] for i in window_data if "difference" in i and "rating_level" in i]
if not differences:
return None
aggregated, _ = aggregate_price_rating(differences, threshold_low, threshold_high)
return aggregated.lower() if aggregated else None
def aggregate_window_data(
window_data: list[dict],
value_type: str,
threshold_low: float,
threshold_high: float,
) -> str | float | None:
"""
Aggregate data from multiple intervals based on value type.
Unified helper that routes to appropriate aggregation function.
Args:
window_data: List of price interval dictionaries
value_type: Type of value to aggregate ('price', 'level', or 'rating')
threshold_low: Low threshold for rating calculation
threshold_high: High threshold for rating calculation
Returns:
Aggregated value (price as float, level/rating as str), or None if no data
"""
# Map value types to aggregation functions
aggregators: dict[str, Callable] = {
"price": lambda data: aggregate_price_data(data),
"level": lambda data: aggregate_level_data(data),
"rating": lambda data: aggregate_rating_data(data, threshold_low, threshold_high),
}
aggregator = aggregators.get(value_type)
if aggregator:
return aggregator(window_data)
return None
def get_hourly_price_value(
price_info: dict,
*,
hour_offset: int,
in_euro: bool,
time: TimeService,
) -> float | None:
"""
Get price for current hour or with offset.
Legacy helper for hourly price access (not used by Calculator Pattern).
Kept for potential backward compatibility.
Args:
price_info: Price information dict with 'today' and 'tomorrow' keys
hour_offset: Hour offset from current time (positive=future, negative=past)
in_euro: If True, return price in major currency (EUR), else minor (cents/øre)
time: TimeService instance (required)
Returns:
Price value, or None if not found
"""
# Use TimeService to get the current time in the user's timezone
now = time.now()
# Calculate the exact target datetime (not just the hour)
# This properly handles day boundaries
target_datetime = now.replace(microsecond=0) + timedelta(hours=hour_offset)
target_hour = target_datetime.hour
target_date = target_datetime.date()
# Determine which day's data we need
day_key = "tomorrow" if target_date > now.date() else "today"
for price_data in price_info.get(day_key, []):
# Parse the timestamp and convert to local time
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
# Make sure it's in the local timezone for proper comparison
# Compare using both hour and date for accuracy
if starts_at.hour == target_hour and starts_at.date() == target_date:
return get_price_value(float(price_data["total"]), in_euro=in_euro)
# If we didn't find the price in the expected day's data, check the other day
# This is a fallback for potential edge cases
other_day_key = "today" if day_key == "tomorrow" else "tomorrow"
for price_data in price_info.get(other_day_key, []):
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
if starts_at.hour == target_hour and starts_at.date() == target_date:
return get_price_value(float(price_data["total"]), in_euro=in_euro)
return None