mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-04-07 08:03:40 +00:00
PLW0108: Three lambdas were simple pass-throughs that added no value: lambda data: aggregate_level_data(data) → aggregate_level_data lambda: lifecycle_calculator.get_lifecycle_state() → lifecycle_calculator.get_lifecycle_state Affected files: sensor/calculators/rolling_hour.py (line 115) sensor/helpers.py (line 139) sensor/value_getters.py (line 220) Impact: No behaviour change. Linter now passes with zero warnings.
195 lines
6.6 KiB
Python
195 lines
6.6 KiB
Python
"""
|
|
Sensor platform-specific helper functions.
|
|
|
|
This module contains helper functions specific to the sensor platform:
|
|
- aggregate_price_data: Calculate average price from window data
|
|
- aggregate_level_data: Aggregate price levels from intervals
|
|
- aggregate_rating_data: Aggregate price ratings from intervals
|
|
- aggregate_window_data: Unified aggregation based on value type
|
|
- get_hourly_price_value: Get price for specific hour with offset
|
|
|
|
For shared helper functions (used by both sensor and binary_sensor platforms),
|
|
see entity_utils/helpers.py:
|
|
- get_price_value: Price unit conversion
|
|
- translate_level: Price level translation
|
|
- translate_rating_level: Rating level translation
|
|
- find_rolling_hour_center_index: Rolling hour window calculations
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
|
|
from homeassistant.config_entries import ConfigEntry
|
|
|
|
from custom_components.tibber_prices.const import get_display_unit_factor
|
|
from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets
|
|
from custom_components.tibber_prices.entity_utils.helpers import get_price_value
|
|
from custom_components.tibber_prices.utils.average import calculate_mean, calculate_median
|
|
from custom_components.tibber_prices.utils.price import (
|
|
aggregate_price_levels,
|
|
aggregate_price_rating,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import Callable
|
|
|
|
|
|
def aggregate_average_data(
|
|
window_data: list[dict],
|
|
config_entry: ConfigEntry,
|
|
) -> tuple[float | None, float | None]:
|
|
"""
|
|
Calculate average and median price from window data.
|
|
|
|
Args:
|
|
window_data: List of price interval dictionaries with 'total' key.
|
|
config_entry: Config entry to get display unit configuration.
|
|
|
|
Returns:
|
|
Tuple of (average price, median price) in display currency units,
|
|
or (None, None) if no prices.
|
|
|
|
"""
|
|
prices = [float(i["total"]) for i in window_data if "total" in i]
|
|
if not prices:
|
|
return None, None
|
|
# Calculate both mean and median
|
|
mean = calculate_mean(prices)
|
|
median = calculate_median(prices)
|
|
# Convert to display currency unit based on configuration
|
|
factor = get_display_unit_factor(config_entry)
|
|
return round(mean * factor, 2), round(median * factor, 2) if median is not None else None
|
|
|
|
|
|
def aggregate_level_data(window_data: list[dict]) -> str | None:
|
|
"""
|
|
Aggregate price levels from window data.
|
|
|
|
Args:
|
|
window_data: List of price interval dictionaries with 'level' key
|
|
|
|
Returns:
|
|
Aggregated price level (lowercase), or None if no levels
|
|
|
|
"""
|
|
levels = [i["level"] for i in window_data if "level" in i]
|
|
if not levels:
|
|
return None
|
|
aggregated = aggregate_price_levels(levels)
|
|
return aggregated.lower() if aggregated else None
|
|
|
|
|
|
def aggregate_rating_data(
|
|
window_data: list[dict],
|
|
threshold_low: float,
|
|
threshold_high: float,
|
|
) -> str | None:
|
|
"""
|
|
Aggregate price ratings from window data.
|
|
|
|
Args:
|
|
window_data: List of price interval dictionaries with 'difference' and 'rating_level'
|
|
threshold_low: Low threshold for rating calculation
|
|
threshold_high: High threshold for rating calculation
|
|
|
|
Returns:
|
|
Aggregated price rating (lowercase), or None if no ratings
|
|
|
|
"""
|
|
differences = [i["difference"] for i in window_data if "difference" in i and "rating_level" in i]
|
|
if not differences:
|
|
return None
|
|
|
|
aggregated, _ = aggregate_price_rating(differences, threshold_low, threshold_high)
|
|
return aggregated.lower() if aggregated else None
|
|
|
|
|
|
def aggregate_window_data(
|
|
window_data: list[dict],
|
|
value_type: str,
|
|
threshold_low: float,
|
|
threshold_high: float,
|
|
config_entry: ConfigEntry,
|
|
) -> str | float | None:
|
|
"""
|
|
Aggregate data from multiple intervals based on value type.
|
|
|
|
Unified helper that routes to appropriate aggregation function.
|
|
|
|
NOTE: This function is legacy code - rolling_hour calculator has its own implementation.
|
|
|
|
Args:
|
|
window_data: List of price interval dictionaries.
|
|
value_type: Type of value to aggregate ('price', 'level', or 'rating').
|
|
threshold_low: Low threshold for rating calculation.
|
|
threshold_high: High threshold for rating calculation.
|
|
config_entry: Config entry to get display unit configuration.
|
|
|
|
Returns:
|
|
Aggregated value (price as float, level/rating as str), or None if no data.
|
|
|
|
"""
|
|
# Map value types to aggregation functions
|
|
aggregators: dict[str, Callable] = {
|
|
"price": lambda data: aggregate_average_data(data, config_entry)[0], # Use only average from tuple
|
|
"level": aggregate_level_data,
|
|
"rating": lambda data: aggregate_rating_data(data, threshold_low, threshold_high),
|
|
}
|
|
|
|
aggregator = aggregators.get(value_type)
|
|
if aggregator:
|
|
return aggregator(window_data)
|
|
return None
|
|
|
|
|
|
def get_hourly_price_value(
|
|
coordinator_data: dict,
|
|
*,
|
|
hour_offset: int,
|
|
in_euro: bool,
|
|
time: TibberPricesTimeService,
|
|
) -> float | None:
|
|
"""
|
|
Get price for current hour or with offset.
|
|
|
|
Legacy helper for hourly price access (not used by Calculator Pattern).
|
|
Kept for potential backward compatibility.
|
|
|
|
Args:
|
|
coordinator_data: Coordinator data dict
|
|
hour_offset: Hour offset from current time (positive=future, negative=past)
|
|
in_euro: If True, return price in base currency (EUR), else minor (cents/øre)
|
|
time: TibberPricesTimeService instance (required)
|
|
|
|
Returns:
|
|
Price value, or None if not found
|
|
|
|
"""
|
|
# Use TimeService to get the current time in the user's timezone
|
|
now = time.now()
|
|
|
|
# Calculate the exact target datetime (not just the hour)
|
|
# This properly handles day boundaries
|
|
target_datetime = now.replace(microsecond=0) + timedelta(hours=hour_offset)
|
|
target_hour = target_datetime.hour
|
|
target_date = target_datetime.date()
|
|
|
|
# Get all intervals (yesterday, today, tomorrow) via helper
|
|
all_intervals = get_intervals_for_day_offsets(coordinator_data, [-1, 0, 1])
|
|
|
|
# Search through all intervals to find the matching hour
|
|
for price_data in all_intervals:
|
|
# Parse the timestamp and convert to local time
|
|
starts_at = time.get_interval_time(price_data)
|
|
if starts_at is None:
|
|
continue
|
|
|
|
# Compare using both hour and date for accuracy
|
|
if starts_at.hour == target_hour and starts_at.date() == target_date:
|
|
return get_price_value(float(price_data["total"]), in_euro=in_euro)
|
|
|
|
return None
|