feat(sensors): always show both mean and median in average sensor attributes

Implemented configurable display format (mean/median/both) while always
calculating and exposing both price_mean and price_median attributes.

Core changes:
- utils/average.py: Refactored calculate_mean_median() to always return both
  values, added comprehensive None handling (117 lines changed)
- sensor/attributes/helpers.py: Always include both attributes regardless of
  user display preference (41 lines)
- sensor/core.py: Dynamic _unrecorded_attributes based on display setting
  (55 lines), extracted helper methods to reduce complexity
- Updated all calculators (rolling_hour, trend, volatility, window_24h) to
  use new always-both approach

Impact: Users can switch display format in UI without losing historical data.
Automation authors always have access to both statistical measures.
This commit is contained in:
Julian Pawlowski 2025-12-18 15:12:30 +00:00
parent 29e934d66b
commit abb02083a7
11 changed files with 208 additions and 130 deletions

View file

@ -4,11 +4,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from custom_components.tibber_prices.const import (
CONF_AVERAGE_SENSOR_DISPLAY,
DEFAULT_AVERAGE_SENSOR_DISPLAY,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from custom_components.tibber_prices.data import TibberPricesConfigEntry from custom_components.tibber_prices.data import TibberPricesConfigEntry
@ -18,35 +13,29 @@ def add_alternate_average_attribute(
cached_data: dict, cached_data: dict,
base_key: str, base_key: str,
*, *,
config_entry: TibberPricesConfigEntry, config_entry: TibberPricesConfigEntry, # noqa: ARG001
) -> None: ) -> None:
""" """
Add the alternate average value (mean or median) as attribute. Add both average values (mean and median) as attributes.
If user selected "median" as state display, adds "price_mean" as attribute. This ensures automations work consistently regardless of which value
If user selected "mean" as state display, adds "price_median" as attribute. is displayed in the state. Both values are always available as attributes.
Note: To avoid duplicate recording, the value used as state should be
excluded from recorder via dynamic _unrecorded_attributes in sensor core.
Args: Args:
attributes: Dictionary to add attribute to attributes: Dictionary to add attribute to
cached_data: Cached calculation data containing mean/median values cached_data: Cached calculation data containing mean/median values
base_key: Base key for cached values (e.g., "average_price_today", "rolling_hour_0") base_key: Base key for cached values (e.g., "average_price_today", "rolling_hour_0")
config_entry: Config entry for user preferences config_entry: Config entry for user preferences (used to determine which value is in state)
""" """
# Get user preference for which value to display in state # Always add both mean and median values as attributes
display_mode = config_entry.options.get(
CONF_AVERAGE_SENSOR_DISPLAY,
DEFAULT_AVERAGE_SENSOR_DISPLAY,
)
# Add the alternate value as attribute
if display_mode == "median":
# State shows median → add mean as attribute
mean_value = cached_data.get(f"{base_key}_mean") mean_value = cached_data.get(f"{base_key}_mean")
if mean_value is not None: if mean_value is not None:
attributes["price_mean"] = mean_value attributes["price_mean"] = mean_value
else:
# State shows mean → add median as attribute
median_value = cached_data.get(f"{base_key}_median") median_value = cached_data.get(f"{base_key}_median")
if median_value is not None: if median_value is not None:
attributes["price_median"] = median_value attributes["price_median"] = median_value

View file

@ -11,8 +11,8 @@ from custom_components.tibber_prices.const import (
from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets
from custom_components.tibber_prices.entity_utils import find_rolling_hour_center_index from custom_components.tibber_prices.entity_utils import find_rolling_hour_center_index
from custom_components.tibber_prices.sensor.helpers import ( from custom_components.tibber_prices.sensor.helpers import (
aggregate_average_data,
aggregate_level_data, aggregate_level_data,
aggregate_price_data,
aggregate_rating_data, aggregate_rating_data,
) )
@ -108,7 +108,7 @@ class TibberPricesRollingHourCalculator(TibberPricesBaseCalculator):
# Handle price aggregation - return tuple directly # Handle price aggregation - return tuple directly
if value_type == "price": if value_type == "price":
return aggregate_price_data(window_data, self.config_entry) return aggregate_average_data(window_data, self.config_entry)
# Map other value types to aggregation functions # Map other value types to aggregation functions
aggregators = { aggregators = {

View file

@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Any
from custom_components.tibber_prices.const import get_display_unit_factor from custom_components.tibber_prices.const import get_display_unit_factor
from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets
from custom_components.tibber_prices.utils.average import calculate_next_n_hours_avg from custom_components.tibber_prices.utils.average import calculate_mean, calculate_next_n_hours_mean
from custom_components.tibber_prices.utils.price import ( from custom_components.tibber_prices.utils.price import (
calculate_price_trend, calculate_price_trend,
find_price_data_for_interval, find_price_data_for_interval,
@ -97,9 +97,9 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
# Get next interval timestamp (basis for calculation) # Get next interval timestamp (basis for calculation)
next_interval_start = time.get_next_interval_start() next_interval_start = time.get_next_interval_start()
# Get future average price # Get future mean price (ignore median for trend calculation)
future_avg, _ = calculate_next_n_hours_avg(self.coordinator.data, hours, time=self.coordinator.time) future_mean, _ = calculate_next_n_hours_mean(self.coordinator.data, hours, time=self.coordinator.time)
if future_avg is None: if future_mean is None:
return None return None
# Get configured thresholds from options # Get configured thresholds from options
@ -117,7 +117,7 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
# Calculate trend with volatility-adaptive thresholds # Calculate trend with volatility-adaptive thresholds
trend_state, diff_pct = calculate_price_trend( trend_state, diff_pct = calculate_price_trend(
current_interval_price, current_interval_price,
future_avg, future_mean,
threshold_rising=threshold_rising, threshold_rising=threshold_rising,
threshold_falling=threshold_falling, threshold_falling=threshold_falling,
volatility_adjustment=True, # Always enabled volatility_adjustment=True, # Always enabled
@ -141,7 +141,7 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
self._trend_attributes = { self._trend_attributes = {
"timestamp": next_interval_start, "timestamp": next_interval_start,
f"trend_{hours}h_%": round(diff_pct, 1), f"trend_{hours}h_%": round(diff_pct, 1),
f"next_{hours}h_avg": round(future_avg * factor, 2), f"next_{hours}h_avg": round(future_mean * factor, 2),
"interval_count": lookahead_intervals, "interval_count": lookahead_intervals,
"threshold_rising": threshold_rising, "threshold_rising": threshold_rising,
"threshold_falling": threshold_falling, "threshold_falling": threshold_falling,
@ -282,7 +282,7 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
later_prices.append(float(price)) later_prices.append(float(price))
if later_prices: if later_prices:
return sum(later_prices) / len(later_prices) return calculate_mean(later_prices)
return None return None
@ -349,11 +349,11 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
# Combine momentum + future outlook to get ACTUAL current trend # Combine momentum + future outlook to get ACTUAL current trend
if len(future_intervals) >= min_intervals_for_trend and future_prices: if len(future_intervals) >= min_intervals_for_trend and future_prices:
future_avg = sum(future_prices) / len(future_prices) future_mean = calculate_mean(future_prices)
current_trend_state = self._combine_momentum_with_future( current_trend_state = self._combine_momentum_with_future(
current_momentum=current_momentum, current_momentum=current_momentum,
current_price=current_price, current_price=current_price,
future_avg=future_avg, future_mean=future_mean,
context={ context={
"all_intervals": all_intervals, "all_intervals": all_intervals,
"current_index": current_index, "current_index": current_index,
@ -466,7 +466,7 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
*, *,
current_momentum: str, current_momentum: str,
current_price: float, current_price: float,
future_avg: float, future_mean: float,
context: dict, context: dict,
) -> str: ) -> str:
""" """
@ -475,7 +475,7 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
Args: Args:
current_momentum: Current momentum direction (rising/falling/stable) current_momentum: Current momentum direction (rising/falling/stable)
current_price: Current interval price current_price: Current interval price
future_avg: Average price in future window future_mean: Average price in future window
context: Dict with all_intervals, current_index, lookahead_intervals, thresholds context: Dict with all_intervals, current_index, lookahead_intervals, thresholds
Returns: Returns:
@ -484,11 +484,11 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
""" """
if current_momentum == "rising": if current_momentum == "rising":
# We're in uptrend - does it continue? # We're in uptrend - does it continue?
return "rising" if future_avg >= current_price * 0.98 else "falling" return "rising" if future_mean >= current_price * 0.98 else "falling"
if current_momentum == "falling": if current_momentum == "falling":
# We're in downtrend - does it continue? # We're in downtrend - does it continue?
return "falling" if future_avg <= current_price * 1.02 else "rising" return "falling" if future_mean <= current_price * 1.02 else "rising"
# current_momentum == "stable" - what's coming? # current_momentum == "stable" - what's coming?
all_intervals = context["all_intervals"] all_intervals = context["all_intervals"]
@ -499,7 +499,7 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
lookahead_for_volatility = all_intervals[current_index : current_index + lookahead_intervals] lookahead_for_volatility = all_intervals[current_index : current_index + lookahead_intervals]
trend_state, _ = calculate_price_trend( trend_state, _ = calculate_price_trend(
current_price, current_price,
future_avg, future_mean,
threshold_rising=thresholds["rising"], threshold_rising=thresholds["rising"],
threshold_falling=thresholds["falling"], threshold_falling=thresholds["falling"],
volatility_adjustment=True, volatility_adjustment=True,
@ -530,13 +530,13 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
if not standard_future_prices: if not standard_future_prices:
return "stable" return "stable"
standard_future_avg = sum(standard_future_prices) / len(standard_future_prices) standard_future_mean = calculate_mean(standard_future_prices)
current_price = float(current_interval["total"]) current_price = float(current_interval["total"])
standard_lookahead_volatility = all_intervals[current_index : current_index + standard_lookahead] standard_lookahead_volatility = all_intervals[current_index : current_index + standard_lookahead]
current_trend_3h, _ = calculate_price_trend( current_trend_3h, _ = calculate_price_trend(
current_price, current_price,
standard_future_avg, standard_future_mean,
threshold_rising=thresholds["rising"], threshold_rising=thresholds["rising"],
threshold_falling=thresholds["falling"], threshold_falling=thresholds["falling"],
volatility_adjustment=True, volatility_adjustment=True,
@ -601,14 +601,14 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
if not future_prices: if not future_prices:
continue continue
future_avg = sum(future_prices) / len(future_prices) future_mean = calculate_mean(future_prices)
price = float(interval["total"]) price = float(interval["total"])
# Calculate trend at this past point # Calculate trend at this past point
lookahead_for_volatility = all_intervals[i : i + intervals_in_3h] lookahead_for_volatility = all_intervals[i : i + intervals_in_3h]
trend_state, _ = calculate_price_trend( trend_state, _ = calculate_price_trend(
price, price,
future_avg, future_mean,
threshold_rising=thresholds["rising"], threshold_rising=thresholds["rising"],
threshold_falling=thresholds["falling"], threshold_falling=thresholds["falling"],
volatility_adjustment=True, volatility_adjustment=True,
@ -673,14 +673,14 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
if not future_prices: if not future_prices:
continue continue
future_avg = sum(future_prices) / len(future_prices) future_mean = calculate_mean(future_prices)
current_price = float(interval["total"]) current_price = float(interval["total"])
# Calculate trend at this future point # Calculate trend at this future point
lookahead_for_volatility = all_intervals[i : i + intervals_in_3h] lookahead_for_volatility = all_intervals[i : i + intervals_in_3h]
trend_state, _ = calculate_price_trend( trend_state, _ = calculate_price_trend(
current_price, current_price,
future_avg, future_mean,
threshold_rising=thresholds["rising"], threshold_rising=thresholds["rising"],
threshold_falling=thresholds["falling"], threshold_falling=thresholds["falling"],
volatility_adjustment=True, volatility_adjustment=True,
@ -706,8 +706,8 @@ class TibberPricesTrendCalculator(TibberPricesBaseCalculator):
"minutes_until_change": minutes_until, "minutes_until_change": minutes_until,
"current_price_now": round(float(current_interval["total"]) * factor, 2), "current_price_now": round(float(current_interval["total"]) * factor, 2),
"price_at_change": round(current_price * factor, 2), "price_at_change": round(current_price * factor, 2),
"avg_after_change": round(future_avg * factor, 2), "avg_after_change": round(future_mean * factor, 2),
"trend_diff_%": round((future_avg - current_price) / current_price * 100, 1), "trend_diff_%": round((future_mean - current_price) / current_price * 100, 1),
} }
return interval_start return interval_start

View file

@ -10,6 +10,7 @@ from custom_components.tibber_prices.sensor.attributes import (
add_volatility_type_attributes, add_volatility_type_attributes,
get_prices_for_volatility, get_prices_for_volatility,
) )
from custom_components.tibber_prices.utils.average import calculate_mean
from custom_components.tibber_prices.utils.price import calculate_volatility_level from custom_components.tibber_prices.utils.price import calculate_volatility_level
from .base import TibberPricesBaseCalculator from .base import TibberPricesBaseCalculator
@ -75,7 +76,7 @@ class TibberPricesVolatilityCalculator(TibberPricesBaseCalculator):
price_max = max(prices_to_analyze) price_max = max(prices_to_analyze)
spread = price_max - price_min spread = price_max - price_min
# Use arithmetic mean for volatility calculation (required for coefficient of variation) # Use arithmetic mean for volatility calculation (required for coefficient of variation)
price_mean = sum(prices_to_analyze) / len(prices_to_analyze) price_mean = calculate_mean(prices_to_analyze)
# Convert to display currency unit based on configuration # Convert to display currency unit based on configuration
factor = get_display_unit_factor(self.config_entry) factor = get_display_unit_factor(self.config_entry)

View file

@ -33,11 +33,11 @@ class TibberPricesWindow24hCalculator(TibberPricesBaseCalculator):
- "leading": Next 24 hours (96 intervals after current) - "leading": Next 24 hours (96 intervals after current)
Args: Args:
stat_func: Function from average_utils (e.g., calculate_current_trailing_avg). stat_func: Function from average_utils (e.g., calculate_current_trailing_mean).
Returns: Returns:
Price value in subunit currency units (cents/øre), or None if unavailable. Price value in subunit currency units (cents/øre), or None if unavailable.
For average functions: tuple of (avg, median) where median may be None. For mean functions: tuple of (mean, median) where median may be None.
For min/max functions: single float value. For min/max functions: single float value.
""" """
@ -46,19 +46,19 @@ class TibberPricesWindow24hCalculator(TibberPricesBaseCalculator):
result = stat_func(self.coordinator_data, time=self.coordinator.time) result = stat_func(self.coordinator_data, time=self.coordinator.time)
# Check if result is a tuple (avg, median) from average functions # Check if result is a tuple (mean, median) from mean functions
if isinstance(result, tuple): if isinstance(result, tuple):
value, median = result value, median = result
if value is None: if value is None:
return None return None
# Convert to display currency units based on config # Convert to display currency units based on config
avg_result = round(get_price_value(value, config_entry=self.coordinator.config_entry), 2) mean_result = round(get_price_value(value, config_entry=self.coordinator.config_entry), 2)
median_result = ( median_result = (
round(get_price_value(median, config_entry=self.coordinator.config_entry), 2) round(get_price_value(median, config_entry=self.coordinator.config_entry), 2)
if median is not None if median is not None
else None else None
) )
return avg_result, median_result return mean_result, median_result
# Single value result (min/max functions) # Single value result (min/max functions)
value = result value = result

View file

@ -40,7 +40,7 @@ from custom_components.tibber_prices.entity_utils.icons import (
get_dynamic_icon, get_dynamic_icon,
) )
from custom_components.tibber_prices.utils.average import ( from custom_components.tibber_prices.utils.average import (
calculate_next_n_hours_avg, calculate_next_n_hours_mean,
) )
from custom_components.tibber_prices.utils.price import ( from custom_components.tibber_prices.utils.price import (
calculate_volatility_level, calculate_volatility_level,
@ -100,7 +100,7 @@ MIN_HOURS_FOR_LATER_HALF = 3 # Minimum hours needed to calculate later half ave
class TibberPricesSensor(TibberPricesEntity, RestoreSensor): class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
"""tibber_prices Sensor class with state restoration.""" """tibber_prices Sensor class with state restoration."""
# Attributes excluded from recorder history # Base attributes excluded from recorder history (shared across all sensors)
# See: https://developers.home-assistant.io/docs/core/entity/#excluding-state-attributes-from-recorder-history # See: https://developers.home-assistant.io/docs/core/entity/#excluding-state-attributes-from-recorder-history
_unrecorded_attributes = frozenset( _unrecorded_attributes = frozenset(
{ {
@ -190,7 +190,48 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
"""When entity is added to hass.""" """When entity is added to hass."""
await super().async_added_to_hass() await super().async_added_to_hass()
# Configure dynamic attribute exclusion for average sensors
self._configure_average_sensor_exclusions()
# Restore last state if available # Restore last state if available
await self._restore_last_state()
# Register listeners for time-sensitive updates
self._register_update_listeners()
# Trigger initial chart data loads as background tasks
self._trigger_chart_data_loads()
def _configure_average_sensor_exclusions(self) -> None:
"""Configure dynamic attribute exclusions for average sensors."""
# Dynamically exclude average attribute that matches state value
# (to avoid recording the same value twice: once as state, once as attribute)
key = self.entity_description.key
if key in (
"average_price_today",
"average_price_tomorrow",
"trailing_price_average",
"leading_price_average",
"current_hour_average_price",
"next_hour_average_price",
) or key.startswith("next_avg_"): # Future average sensors
display_mode = self.coordinator.config_entry.options.get(
CONF_AVERAGE_SENSOR_DISPLAY,
DEFAULT_AVERAGE_SENSOR_DISPLAY,
)
# Modify _state_info to add dynamic exclusion
if self._state_info is None:
self._state_info = {}
current_unrecorded = self._state_info.get("unrecorded_attributes", frozenset())
# State shows median → exclude price_median from attributes
# State shows mean → exclude price_mean from attributes
if display_mode == "median":
self._state_info["unrecorded_attributes"] = current_unrecorded | {"price_median"}
else:
self._state_info["unrecorded_attributes"] = current_unrecorded | {"price_mean"}
async def _restore_last_state(self) -> None:
"""Restore last state if available."""
if ( if (
(last_state := await self.async_get_last_state()) is not None (last_state := await self.async_get_last_state()) is not None
and last_state.state not in (None, "unknown", "unavailable", "") and last_state.state not in (None, "unknown", "unavailable", "")
@ -213,6 +254,8 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
self._chart_metadata_response = metadata_attrs self._chart_metadata_response = metadata_attrs
self._chart_metadata_last_update = last_state.attributes.get("last_update") self._chart_metadata_last_update = last_state.attributes.get("last_update")
def _register_update_listeners(self) -> None:
"""Register listeners for time-sensitive updates."""
# Register with coordinator for time-sensitive updates if applicable # Register with coordinator for time-sensitive updates if applicable
if self.entity_description.key in TIME_SENSITIVE_ENTITY_KEYS: if self.entity_description.key in TIME_SENSITIVE_ENTITY_KEYS:
self._time_sensitive_remove_listener = self.coordinator.async_add_time_sensitive_listener( self._time_sensitive_remove_listener = self.coordinator.async_add_time_sensitive_listener(
@ -225,6 +268,8 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
self._handle_minute_update self._handle_minute_update
) )
def _trigger_chart_data_loads(self) -> None:
"""Trigger initial chart data loads as background tasks."""
# For chart_data_export, trigger initial service call as background task # For chart_data_export, trigger initial service call as background task
# (non-blocking to avoid delaying entity setup) # (non-blocking to avoid delaying entity setup)
if self.entity_description.key == "chart_data_export": if self.entity_description.key == "chart_data_export":
@ -521,7 +566,7 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
- "leading": Next 24 hours (96 intervals after current) - "leading": Next 24 hours (96 intervals after current)
Args: Args:
stat_func: Function from average_utils (e.g., calculate_current_trailing_avg) stat_func: Function from average_utils (e.g., calculate_current_trailing_mean)
Returns: Returns:
Price value in subunit currency units (cents/øre), or None if unavailable Price value in subunit currency units (cents/øre), or None if unavailable
@ -570,28 +615,37 @@ class TibberPricesSensor(TibberPricesEntity, RestoreSensor):
def _get_next_avg_n_hours_value(self, hours: int) -> float | None: def _get_next_avg_n_hours_value(self, hours: int) -> float | None:
""" """
Get average price for next N hours starting from next interval. Get mean price for next N hours starting from next interval.
Args: Args:
hours: Number of hours to look ahead (1, 2, 3, 4, 5, 6, 8, 12) hours: Number of hours to look ahead (1, 2, 3, 4, 5, 6, 8, 12)
Returns: Returns:
Average price in subunit currency units (e.g., cents), or None if unavailable Mean or median price (based on config) in subunit currency units (e.g., cents),
or None if unavailable
""" """
avg_price, median_price = calculate_next_n_hours_avg(self.coordinator.data, hours, time=self.coordinator.time) mean_price, median_price = calculate_next_n_hours_mean(self.coordinator.data, hours, time=self.coordinator.time)
if avg_price is None: if mean_price is None:
return None return None
# Get display unit factor (100 for minor, 1 for major) # Get display unit factor (100 for minor, 1 for major)
factor = get_display_unit_factor(self.coordinator.config_entry) factor = get_display_unit_factor(self.coordinator.config_entry)
# Store median for attributes # Get user preference for display (mean or median)
display_pref = self.coordinator.config_entry.options.get(
CONF_AVERAGE_SENSOR_DISPLAY, DEFAULT_AVERAGE_SENSOR_DISPLAY
)
# Store both values for attributes
self.cached_data[f"next_avg_{hours}h_mean"] = round(mean_price * factor, 2)
if median_price is not None: if median_price is not None:
self.cached_data[f"next_avg_{hours}h_median"] = round(median_price * factor, 2) self.cached_data[f"next_avg_{hours}h_median"] = round(median_price * factor, 2)
# Convert from major to display currency units # Return the value chosen for state display
return round(avg_price * factor, 2) if display_pref == "median" and median_price is not None:
return round(median_price * factor, 2)
return round(mean_price * factor, 2) # "mean"
def _get_data_timestamp(self) -> datetime | None: def _get_data_timestamp(self) -> datetime | None:
""" """

View file

@ -454,7 +454,7 @@ WINDOW_24H_SENSORS = (
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# Calculate averages and trends for upcoming time windows # Calculate averages and trends for upcoming time windows
FUTURE_AVG_SENSORS = ( FUTURE_MEAN_SENSORS = (
# Default enabled: 1h-5h # Default enabled: 1h-5h
SensorEntityDescription( SensorEntityDescription(
key="next_avg_1h", key="next_avg_1h",
@ -1031,7 +1031,7 @@ ENTITY_DESCRIPTIONS = (
*DAILY_LEVEL_SENSORS, *DAILY_LEVEL_SENSORS,
*DAILY_RATING_SENSORS, *DAILY_RATING_SENSORS,
*WINDOW_24H_SENSORS, *WINDOW_24H_SENSORS,
*FUTURE_AVG_SENSORS, *FUTURE_MEAN_SENSORS,
*FUTURE_TREND_SENSORS, *FUTURE_TREND_SENSORS,
*VOLATILITY_SENSORS, *VOLATILITY_SENSORS,
*BEST_PRICE_TIMING_SENSORS, *BEST_PRICE_TIMING_SENSORS,

View file

@ -28,7 +28,7 @@ if TYPE_CHECKING:
from custom_components.tibber_prices.const import get_display_unit_factor from custom_components.tibber_prices.const import get_display_unit_factor
from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets
from custom_components.tibber_prices.entity_utils.helpers import get_price_value from custom_components.tibber_prices.entity_utils.helpers import get_price_value
from custom_components.tibber_prices.utils.average import calculate_median from custom_components.tibber_prices.utils.average import calculate_mean, calculate_median
from custom_components.tibber_prices.utils.price import ( from custom_components.tibber_prices.utils.price import (
aggregate_price_levels, aggregate_price_levels,
aggregate_price_rating, aggregate_price_rating,
@ -38,7 +38,7 @@ if TYPE_CHECKING:
from collections.abc import Callable from collections.abc import Callable
def aggregate_price_data( def aggregate_average_data(
window_data: list[dict], window_data: list[dict],
config_entry: ConfigEntry, config_entry: ConfigEntry,
) -> tuple[float | None, float | None]: ) -> tuple[float | None, float | None]:
@ -57,12 +57,12 @@ def aggregate_price_data(
prices = [float(i["total"]) for i in window_data if "total" in i] prices = [float(i["total"]) for i in window_data if "total" in i]
if not prices: if not prices:
return None, None return None, None
# Calculate both average and median # Calculate both mean and median
avg = sum(prices) / len(prices) mean = calculate_mean(prices)
median = calculate_median(prices) median = calculate_median(prices)
# Convert to display currency unit based on configuration # Convert to display currency unit based on configuration
factor = get_display_unit_factor(config_entry) factor = get_display_unit_factor(config_entry)
return round(avg * factor, 2), round(median * factor, 2) if median is not None else None return round(mean * factor, 2), round(median * factor, 2) if median is not None else None
def aggregate_level_data(window_data: list[dict]) -> str | None: def aggregate_level_data(window_data: list[dict]) -> str | None:
@ -135,7 +135,7 @@ def aggregate_window_data(
""" """
# Map value types to aggregation functions # Map value types to aggregation functions
aggregators: dict[str, Callable] = { aggregators: dict[str, Callable] = {
"price": lambda data: aggregate_price_data(data, config_entry)[0], # Use only average from tuple "price": lambda data: aggregate_average_data(data, config_entry)[0], # Use only average from tuple
"level": lambda data: aggregate_level_data(data), "level": lambda data: aggregate_level_data(data),
"rating": lambda data: aggregate_rating_data(data, threshold_low, threshold_high), "rating": lambda data: aggregate_rating_data(data, threshold_low, threshold_high),
} }

View file

@ -5,12 +5,13 @@ from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from custom_components.tibber_prices.utils.average import ( from custom_components.tibber_prices.utils.average import (
calculate_current_leading_avg,
calculate_current_leading_max, calculate_current_leading_max,
calculate_current_leading_mean,
calculate_current_leading_min, calculate_current_leading_min,
calculate_current_trailing_avg,
calculate_current_trailing_max, calculate_current_trailing_max,
calculate_current_trailing_mean,
calculate_current_trailing_min, calculate_current_trailing_min,
calculate_mean,
calculate_median, calculate_median,
) )
@ -131,14 +132,14 @@ def get_value_getter_mapping( # noqa: PLR0913 - needs all calculators as parame
"highest_price_today": lambda: daily_stat_calculator.get_daily_stat_value(day="today", stat_func=max), "highest_price_today": lambda: daily_stat_calculator.get_daily_stat_value(day="today", stat_func=max),
"average_price_today": lambda: daily_stat_calculator.get_daily_stat_value( "average_price_today": lambda: daily_stat_calculator.get_daily_stat_value(
day="today", day="today",
stat_func=lambda prices: (sum(prices) / len(prices), calculate_median(prices)), stat_func=lambda prices: (calculate_mean(prices), calculate_median(prices)),
), ),
# Tomorrow statistics sensors # Tomorrow statistics sensors
"lowest_price_tomorrow": lambda: daily_stat_calculator.get_daily_stat_value(day="tomorrow", stat_func=min), "lowest_price_tomorrow": lambda: daily_stat_calculator.get_daily_stat_value(day="tomorrow", stat_func=min),
"highest_price_tomorrow": lambda: daily_stat_calculator.get_daily_stat_value(day="tomorrow", stat_func=max), "highest_price_tomorrow": lambda: daily_stat_calculator.get_daily_stat_value(day="tomorrow", stat_func=max),
"average_price_tomorrow": lambda: daily_stat_calculator.get_daily_stat_value( "average_price_tomorrow": lambda: daily_stat_calculator.get_daily_stat_value(
day="tomorrow", day="tomorrow",
stat_func=lambda prices: (sum(prices) / len(prices), calculate_median(prices)), stat_func=lambda prices: (calculate_mean(prices), calculate_median(prices)),
), ),
# Daily aggregated level sensors # Daily aggregated level sensors
"yesterday_price_level": lambda: daily_stat_calculator.get_daily_aggregated_value( "yesterday_price_level": lambda: daily_stat_calculator.get_daily_aggregated_value(
@ -163,10 +164,10 @@ def get_value_getter_mapping( # noqa: PLR0913 - needs all calculators as parame
# ================================================================ # ================================================================
# Trailing and leading average sensors # Trailing and leading average sensors
"trailing_price_average": lambda: window_24h_calculator.get_24h_window_value( "trailing_price_average": lambda: window_24h_calculator.get_24h_window_value(
stat_func=calculate_current_trailing_avg, stat_func=calculate_current_trailing_mean,
), ),
"leading_price_average": lambda: window_24h_calculator.get_24h_window_value( "leading_price_average": lambda: window_24h_calculator.get_24h_window_value(
stat_func=calculate_current_leading_avg, stat_func=calculate_current_leading_mean,
), ),
# Trailing and leading min/max sensors # Trailing and leading min/max sensors
"trailing_price_min": lambda: window_24h_calculator.get_24h_window_value( "trailing_price_min": lambda: window_24h_calculator.get_24h_window_value(

View file

@ -17,13 +17,15 @@ For entity-specific utilities (icons, colors, attributes), see entity_utils/ pac
from __future__ import annotations from __future__ import annotations
from .average import ( from .average import (
calculate_current_leading_avg,
calculate_current_leading_max, calculate_current_leading_max,
calculate_current_leading_mean,
calculate_current_leading_min, calculate_current_leading_min,
calculate_current_trailing_avg,
calculate_current_trailing_max, calculate_current_trailing_max,
calculate_current_trailing_mean,
calculate_current_trailing_min, calculate_current_trailing_min,
calculate_next_n_hours_avg, calculate_mean,
calculate_median,
calculate_next_n_hours_mean,
) )
from .price import ( from .price import (
aggregate_period_levels, aggregate_period_levels,
@ -44,14 +46,16 @@ __all__ = [
"aggregate_period_ratings", "aggregate_period_ratings",
"aggregate_price_levels", "aggregate_price_levels",
"aggregate_price_rating", "aggregate_price_rating",
"calculate_current_leading_avg",
"calculate_current_leading_max", "calculate_current_leading_max",
"calculate_current_leading_mean",
"calculate_current_leading_min", "calculate_current_leading_min",
"calculate_current_trailing_avg",
"calculate_current_trailing_max", "calculate_current_trailing_max",
"calculate_current_trailing_mean",
"calculate_current_trailing_min", "calculate_current_trailing_min",
"calculate_difference_percentage", "calculate_difference_percentage",
"calculate_next_n_hours_avg", "calculate_mean",
"calculate_median",
"calculate_next_n_hours_mean",
"calculate_price_trend", "calculate_price_trend",
"calculate_rating_level", "calculate_rating_level",
"calculate_trailing_average_for_interval", "calculate_trailing_average_for_interval",

View file

@ -35,17 +35,43 @@ def calculate_median(prices: list[float]) -> float | None:
return sorted_prices[n // 2] return sorted_prices[n // 2]
def calculate_trailing_24h_avg(all_prices: list[dict], interval_start: datetime) -> tuple[float | None, float | None]: def calculate_mean(prices: list[float]) -> float:
""" """
Calculate trailing 24-hour average and median price for a given interval. Calculate arithmetic mean (average) from a list of prices.
Args:
prices: List of price values (must not be empty)
Returns:
Mean price
Raises:
ValueError: If prices list is empty
"""
if not prices:
msg = "Cannot calculate mean of empty list"
raise ValueError(msg)
return sum(prices) / len(prices)
def calculate_trailing_24h_mean(
all_prices: list[dict],
interval_start: datetime,
*,
time: TibberPricesTimeService,
) -> tuple[float | None, float | None]:
"""
Calculate trailing 24-hour mean and median price for a given interval.
Args: Args:
all_prices: List of all price data (yesterday, today, tomorrow combined) all_prices: List of all price data (yesterday, today, tomorrow combined)
interval_start: Start time of the interval to calculate average for interval_start: Start time of the interval to calculate mean for
time: TibberPricesTimeService instance (required) time: TibberPricesTimeService instance (required)
Returns: Returns:
Tuple of (average price, median price) for the 24 hours preceding the interval, Tuple of (mean price, median price) for the 24 hours preceding the interval,
or (None, None) if no data in window or (None, None) if no data in window
""" """
@ -56,34 +82,39 @@ def calculate_trailing_24h_avg(all_prices: list[dict], interval_start: datetime)
# Filter prices within the 24-hour window # Filter prices within the 24-hour window
prices_in_window = [] prices_in_window = []
for price_data in all_prices: for price_data in all_prices:
starts_at = price_data["startsAt"] # Already datetime object in local timezone starts_at = time.get_interval_time(price_data)
if starts_at is None: if starts_at is None:
continue continue
# Include intervals that start within the window (not including the current interval's end) # Include intervals that start within the window (not including the current interval's end)
if window_start <= starts_at < window_end: if window_start <= starts_at < window_end:
prices_in_window.append(float(price_data["total"])) prices_in_window.append(float(price_data["total"]))
# Calculate average and median # Calculate mean and median
# CRITICAL: Return None instead of 0.0 when no data available # CRITICAL: Return None instead of 0.0 when no data available
# With negative prices, 0.0 could be misinterpreted as a real average value # With negative prices, 0.0 could be misinterpreted as a real mean value
if prices_in_window: if prices_in_window:
avg = sum(prices_in_window) / len(prices_in_window) mean = calculate_mean(prices_in_window)
median = calculate_median(prices_in_window) median = calculate_median(prices_in_window)
return avg, median return mean, median
return None, None return None, None
def calculate_leading_24h_avg(all_prices: list[dict], interval_start: datetime) -> tuple[float | None, float | None]: def calculate_leading_24h_mean(
all_prices: list[dict],
interval_start: datetime,
*,
time: TibberPricesTimeService,
) -> tuple[float | None, float | None]:
""" """
Calculate leading 24-hour average and median price for a given interval. Calculate leading 24-hour mean and median price for a given interval.
Args: Args:
all_prices: List of all price data (yesterday, today, tomorrow combined) all_prices: List of all price data (yesterday, today, tomorrow combined)
interval_start: Start time of the interval to calculate average for interval_start: Start time of the interval to calculate mean for
time: TibberPricesTimeService instance (required) time: TibberPricesTimeService instance (required)
Returns: Returns:
Tuple of (average price, median price) for up to 24 hours following the interval, Tuple of (mean price, median price) for up to 24 hours following the interval,
or (None, None) if no data in window or (None, None) if no data in window
""" """
@ -94,77 +125,79 @@ def calculate_leading_24h_avg(all_prices: list[dict], interval_start: datetime)
# Filter prices within the 24-hour window # Filter prices within the 24-hour window
prices_in_window = [] prices_in_window = []
for price_data in all_prices: for price_data in all_prices:
starts_at = price_data["startsAt"] # Already datetime object in local timezone starts_at = time.get_interval_time(price_data)
if starts_at is None: if starts_at is None:
continue continue
# Include intervals that start within the window # Include intervals that start within the window
if window_start <= starts_at < window_end: if window_start <= starts_at < window_end:
prices_in_window.append(float(price_data["total"])) prices_in_window.append(float(price_data["total"]))
# Calculate average and median # Calculate mean and median
# CRITICAL: Return None instead of 0.0 when no data available # CRITICAL: Return None instead of 0.0 when no data available
# With negative prices, 0.0 could be misinterpreted as a real average value # With negative prices, 0.0 could be misinterpreted as a real mean value
if prices_in_window: if prices_in_window:
avg = sum(prices_in_window) / len(prices_in_window) mean = calculate_mean(prices_in_window)
median = calculate_median(prices_in_window) median = calculate_median(prices_in_window)
return avg, median return mean, median
return None, None return None, None
def calculate_current_trailing_avg( def calculate_current_trailing_mean(
coordinator_data: dict, coordinator_data: dict,
*, *,
time: TibberPricesTimeService, time: TibberPricesTimeService,
) -> float | None: ) -> tuple[float | None, float | None]:
""" """
Calculate the trailing 24-hour average for the current time. Calculate the trailing 24-hour mean and median for the current time.
Args: Args:
coordinator_data: The coordinator data containing priceInfo coordinator_data: The coordinator data containing priceInfo
time: TibberPricesTimeService instance (required) time: TibberPricesTimeService instance (required)
Returns: Returns:
Current trailing 24-hour average price, or None if unavailable Tuple of (mean price, median price), or (None, None) if unavailable
""" """
if not coordinator_data: if not coordinator_data:
return None return None, None
# Get all intervals (yesterday, today, tomorrow) via helper # Get all intervals (yesterday, today, tomorrow) via helper
all_prices = get_intervals_for_day_offsets(coordinator_data, [-1, 0, 1]) all_prices = get_intervals_for_day_offsets(coordinator_data, [-1, 0, 1])
if not all_prices: if not all_prices:
return None return None, None
now = time.now() now = time.now()
return calculate_trailing_24h_min(all_prices, now, time=time) # calculate_trailing_24h_mean returns (mean, median) tuple
return calculate_trailing_24h_mean(all_prices, now, time=time)
def calculate_current_leading_avg( def calculate_current_leading_mean(
coordinator_data: dict, coordinator_data: dict,
*, *,
time: TibberPricesTimeService, time: TibberPricesTimeService,
) -> float | None: ) -> tuple[float | None, float | None]:
""" """
Calculate the leading 24-hour average for the current time. Calculate the leading 24-hour mean and median for the current time.
Args: Args:
coordinator_data: The coordinator data containing priceInfo coordinator_data: The coordinator data containing priceInfo
time: TibberPricesTimeService instance (required) time: TibberPricesTimeService instance (required)
Returns: Returns:
Current leading 24-hour average price, or None if unavailable Tuple of (mean price, median price), or (None, None) if unavailable
""" """
if not coordinator_data: if not coordinator_data:
return None return None, None
# Get all intervals (yesterday, today, tomorrow) via helper # Get all intervals (yesterday, today, tomorrow) via helper
all_prices = get_intervals_for_day_offsets(coordinator_data, [-1, 0, 1]) all_prices = get_intervals_for_day_offsets(coordinator_data, [-1, 0, 1])
if not all_prices: if not all_prices:
return None return None, None
now = time.now() now = time.now()
return calculate_leading_24h_min(all_prices, now, time=time) # calculate_leading_24h_mean returns (mean, median) tuple
return calculate_leading_24h_mean(all_prices, now, time=time)
def calculate_trailing_24h_min( def calculate_trailing_24h_min(
@ -408,11 +441,7 @@ def calculate_current_leading_min(
return None return None
now = time.now() now = time.now()
# calculate_leading_24h_avg returns (avg, median) - we just need the avg return calculate_leading_24h_min(all_prices, now, time=time)
result = calculate_leading_24h_avg(all_prices, now)
if isinstance(result, tuple):
return result[0] # Return avg only
return None
def calculate_current_leading_max( def calculate_current_leading_max(
@ -443,16 +472,16 @@ def calculate_current_leading_max(
return calculate_leading_24h_max(all_prices, now, time=time) return calculate_leading_24h_max(all_prices, now, time=time)
def calculate_next_n_hours_avg( def calculate_next_n_hours_mean(
coordinator_data: dict, coordinator_data: dict,
hours: int, hours: int,
*, *,
time: TibberPricesTimeService, time: TibberPricesTimeService,
) -> tuple[float | None, float | None]: ) -> tuple[float | None, float | None]:
""" """
Calculate average and median price for the next N hours starting from the next interval. Calculate mean and median price for the next N hours starting from the next interval.
This function computes the average and median of all 15-minute intervals starting from This function computes the mean and median of all 15-minute intervals starting from
the next interval (not current) up to N hours into the future. the next interval (not current) up to N hours into the future.
Args: Args:
@ -461,7 +490,7 @@ def calculate_next_n_hours_avg(
time: TibberPricesTimeService instance (required) time: TibberPricesTimeService instance (required)
Returns: Returns:
Tuple of (average price, median price) for the next N hours, Tuple of (mean price, median price) for the next N hours,
or (None, None) if insufficient data or (None, None) if insufficient data
""" """
@ -506,7 +535,7 @@ def calculate_next_n_hours_avg(
if not prices_in_window: if not prices_in_window:
return None, None return None, None
# Return average and median (prefer full period, but allow graceful degradation) # Return mean and median (prefer full period, but allow graceful degradation)
avg = sum(prices_in_window) / len(prices_in_window) mean = calculate_mean(prices_in_window)
median = calculate_median(prices_in_window) median = calculate_median(prices_in_window)
return avg, median return mean, median