hass.tibber_prices/custom_components/tibber_prices/price_utils.py
Julian Pawlowski 7737dccd49 refactor(sensors): rename current price sensors for clarity
Renamed internal sensor keys to be more explicit about their temporal scope:
- current_price → current_interval_price
- price_level → current_interval_price_level
- price_rating → current_interval_price_rating

This naming makes it clearer that these sensors represent the current
15-minute interval, distinguishing them from hourly averages and other
time-based calculations.

Updated across all components:
- Sensor entity descriptions and handlers (sensor.py)
- Time-sensitive entity keys list (coordinator.py)
- Config flow step IDs (config_flow.py)
- Translation keys in all 5 languages (de, en, nb, nl, sv)
- Custom translations (entity descriptions, usage tips)
- Price level/rating lookups (const.py, sensor.py)
- Documentation examples (AGENTS.md, README.md)

Impact: Sensor entity IDs remain unchanged due to translation_key system.
Existing automations continue to work. Only internal code references and
translation structures updated for consistency.
2025-11-15 08:30:25 +00:00

525 lines
18 KiB
Python

"""Utility functions for price data calculations."""
from __future__ import annotations
import logging
import statistics
from datetime import datetime, timedelta
from typing import Any
from homeassistant.util import dt as dt_util
from .const import (
DEFAULT_VOLATILITY_THRESHOLD_HIGH,
DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
PRICE_LEVEL_MAPPING,
PRICE_LEVEL_NORMAL,
PRICE_RATING_NORMAL,
VOLATILITY_HIGH,
VOLATILITY_LOW,
VOLATILITY_MODERATE,
VOLATILITY_VERY_HIGH,
)
_LOGGER = logging.getLogger(__name__)
MINUTES_PER_INTERVAL = 15
MIN_PRICES_FOR_VOLATILITY = 2 # Minimum number of price values needed for volatility calculation
def calculate_volatility_level(
prices: list[float],
threshold_moderate: float | None = None,
threshold_high: float | None = None,
threshold_very_high: float | None = None,
) -> str:
"""
Calculate volatility level from price list using coefficient of variation.
Volatility indicates how much prices fluctuate during a period, which helps
determine whether active load shifting is worthwhile. Uses the coefficient
of variation (CV = std_dev / mean * 100%) for relative comparison that works
across different price levels and period lengths.
Args:
prices: List of price values (in any unit, typically major currency units like EUR or NOK)
threshold_moderate: Custom threshold for MODERATE level (default: use DEFAULT_VOLATILITY_THRESHOLD_MODERATE)
threshold_high: Custom threshold for HIGH level (default: use DEFAULT_VOLATILITY_THRESHOLD_HIGH)
threshold_very_high: Custom threshold for VERY_HIGH level (default: use DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH)
Returns:
Volatility level: "LOW", "MODERATE", "HIGH", or "VERY_HIGH" (uppercase)
Examples:
- CV < 15%: LOW → minimal optimization potential, prices relatively stable
- 15% ≤ CV < 30%: MODERATE → some optimization worthwhile, noticeable variation
- 30% ≤ CV < 50%: HIGH → strong optimization recommended, significant swings
- CV ≥ 50%: VERY_HIGH → maximum optimization potential, extreme volatility
Note:
Requires at least 2 price values for calculation. Returns LOW if insufficient data.
Works identically for short periods (2-3 intervals) and long periods (96 intervals/day).
"""
# Need at least 2 values for standard deviation
if len(prices) < MIN_PRICES_FOR_VOLATILITY:
return VOLATILITY_LOW
# Use provided thresholds or fall back to constants
t_moderate = threshold_moderate if threshold_moderate is not None else DEFAULT_VOLATILITY_THRESHOLD_MODERATE
t_high = threshold_high if threshold_high is not None else DEFAULT_VOLATILITY_THRESHOLD_HIGH
t_very_high = threshold_very_high if threshold_very_high is not None else DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH
# Calculate coefficient of variation
mean = statistics.mean(prices)
if mean <= 0:
# Avoid division by zero or negative mean (shouldn't happen with prices)
return VOLATILITY_LOW
std_dev = statistics.stdev(prices)
coefficient_of_variation = (std_dev / mean) * 100 # As percentage
# Classify based on thresholds
if coefficient_of_variation < t_moderate:
return VOLATILITY_LOW
if coefficient_of_variation < t_high:
return VOLATILITY_MODERATE
if coefficient_of_variation < t_very_high:
return VOLATILITY_HIGH
return VOLATILITY_VERY_HIGH
def calculate_trailing_average_for_interval(
interval_start: datetime,
all_prices: list[dict[str, Any]],
) -> float | None:
"""
Calculate the trailing 24-hour average price for a specific interval.
Args:
interval_start: The start time of the interval we're calculating for
all_prices: List of all available price intervals (yesterday + today + tomorrow)
Returns:
The average price of all intervals in the 24 hours before interval_start,
or None if insufficient data is available.
"""
if not all_prices:
return None
# Calculate the lookback period: 24 hours before this interval
lookback_start = interval_start - timedelta(hours=24)
# Collect all prices that fall within the 24-hour lookback window
matching_prices = []
for price_data in all_prices:
starts_at_str = price_data.get("startsAt")
if not starts_at_str:
continue
# Parse the timestamp
price_time = dt_util.parse_datetime(starts_at_str)
if price_time is None:
continue
# Convert to local timezone for comparison
price_time = dt_util.as_local(price_time)
# Check if this price falls within our lookback window
# Include prices that start >= lookback_start and start < interval_start
if lookback_start <= price_time < interval_start:
total_price = price_data.get("total")
if total_price is not None:
matching_prices.append(float(total_price))
if not matching_prices:
_LOGGER.debug(
"No prices found in 24-hour lookback window for interval starting at %s (lookback: %s to %s)",
interval_start,
lookback_start,
interval_start,
)
return None
# Calculate and return the average
return sum(matching_prices) / len(matching_prices)
def calculate_difference_percentage(
current_interval_price: float,
trailing_average: float | None,
) -> float | None:
"""
Calculate the difference percentage between current price and trailing average.
This mimics the API's "difference" field from priceRating endpoint.
Args:
current_interval_price: The current interval's price
trailing_average: The 24-hour trailing average price
Returns:
The percentage difference: ((current - average) / average) * 100
or None if trailing_average is None or zero.
"""
if trailing_average is None or trailing_average == 0:
return None
return ((current_interval_price - trailing_average) / trailing_average) * 100
def calculate_rating_level(
difference: float | None,
threshold_low: float,
threshold_high: float,
) -> str | None:
"""
Calculate the rating level based on difference percentage and thresholds.
This mimics the API's "level" field from priceRating endpoint.
Args:
difference: The difference percentage (from calculate_difference_percentage)
threshold_low: The low threshold percentage (typically -100 to 0)
threshold_high: The high threshold percentage (typically 0 to 100)
Returns:
"LOW" if difference <= threshold_low
"HIGH" if difference >= threshold_high
"NORMAL" otherwise
None if difference is None
"""
if difference is None:
return None
# If difference falls in both ranges (shouldn't normally happen), return NORMAL
if difference <= threshold_low and difference >= threshold_high:
return PRICE_RATING_NORMAL
# Classify based on thresholds
if difference <= threshold_low:
return "LOW"
if difference >= threshold_high:
return "HIGH"
return PRICE_RATING_NORMAL
def _process_price_interval(
price_interval: dict[str, Any],
all_prices: list[dict[str, Any]],
threshold_low: float,
threshold_high: float,
day_label: str,
) -> None:
"""
Process a single price interval and add difference and rating_level.
Args:
price_interval: The price interval to process (modified in place)
all_prices: All available price intervals for lookback calculation
threshold_low: Low threshold percentage
threshold_high: High threshold percentage
day_label: Label for logging ("today" or "tomorrow")
"""
starts_at_str = price_interval.get("startsAt")
if not starts_at_str:
return
starts_at = dt_util.parse_datetime(starts_at_str)
if starts_at is None:
return
starts_at = dt_util.as_local(starts_at)
current_interval_price = price_interval.get("total")
if current_interval_price is None:
return
# Calculate trailing average
trailing_avg = calculate_trailing_average_for_interval(starts_at, all_prices)
# Calculate and set the difference and rating_level
if trailing_avg is not None:
difference = calculate_difference_percentage(float(current_interval_price), trailing_avg)
price_interval["difference"] = difference
# Calculate rating_level based on difference
rating_level = calculate_rating_level(difference, threshold_low, threshold_high)
price_interval["rating_level"] = rating_level
else:
# Set to None if we couldn't calculate
price_interval["difference"] = None
price_interval["rating_level"] = None
_LOGGER.debug(
"Could not calculate trailing average for %s interval %s",
day_label,
starts_at,
)
def enrich_price_info_with_differences(
price_info: dict[str, Any],
threshold_low: float | None = None,
threshold_high: float | None = None,
) -> dict[str, Any]:
"""
Enrich price info with calculated 'difference' and 'rating_level' values.
Computes the trailing 24-hour average, difference percentage, and rating level
for each interval in today and tomorrow (excluding yesterday since it's historical).
Args:
price_info: Dictionary with 'yesterday', 'today', 'tomorrow' keys
threshold_low: Low threshold percentage for rating_level (defaults to -10)
threshold_high: High threshold percentage for rating_level (defaults to 10)
Returns:
Updated price_info dict with 'difference' and 'rating_level' added
"""
if threshold_low is None:
threshold_low = -10
if threshold_high is None:
threshold_high = 10
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
# Combine all prices for lookback calculation
all_prices = yesterday_prices + today_prices + tomorrow_prices
_LOGGER.debug(
"Enriching price info with differences and rating levels: "
"yesterday=%d, today=%d, tomorrow=%d, thresholds: low=%.2f, high=%.2f",
len(yesterday_prices),
len(today_prices),
len(tomorrow_prices),
threshold_low,
threshold_high,
)
# Process today's prices
for price_interval in today_prices:
_process_price_interval(price_interval, all_prices, threshold_low, threshold_high, "today")
# Process tomorrow's prices
for price_interval in tomorrow_prices:
_process_price_interval(price_interval, all_prices, threshold_low, threshold_high, "tomorrow")
return price_info
def find_price_data_for_interval(price_info: Any, target_time: datetime) -> dict | None:
"""
Find the price data for a specific 15-minute interval timestamp.
Args:
price_info: The price info dictionary from Tibber API
target_time: The target timestamp to find price data for
Returns:
Price data dict if found, None otherwise
"""
day_key = "tomorrow" if target_time.date() > dt_util.now().date() else "today"
search_days = [day_key, "tomorrow" if day_key == "today" else "today"]
for search_day in search_days:
day_prices = price_info.get(search_day, [])
if not day_prices:
continue
for price_data in day_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
if starts_at <= target_time < interval_end and starts_at.date() == target_time.date():
return price_data
return None
def aggregate_price_levels(levels: list[str]) -> str:
"""
Aggregate multiple price levels into a single representative level using median.
Takes a list of price level strings (e.g., "VERY_CHEAP", "NORMAL", "EXPENSIVE")
and returns the median level after sorting by numeric values. This naturally
tends toward "NORMAL" when levels are mixed.
Args:
levels: List of price level strings from intervals
Returns:
The median price level string, or PRICE_LEVEL_NORMAL if input is empty
"""
if not levels:
return PRICE_LEVEL_NORMAL
# Convert levels to numeric values and sort
numeric_values = [PRICE_LEVEL_MAPPING.get(level, 0) for level in levels]
numeric_values.sort()
# Get median (middle value for odd length, lower-middle for even length)
median_idx = len(numeric_values) // 2
median_value = numeric_values[median_idx]
# Convert back to level string
for level, value in PRICE_LEVEL_MAPPING.items():
if value == median_value:
return level
return PRICE_LEVEL_NORMAL
def aggregate_price_rating(differences: list[float], threshold_low: float, threshold_high: float) -> tuple[str, float]:
"""
Aggregate multiple price differences into a single rating level.
Calculates the average difference percentage across multiple intervals
and applies thresholds to determine the overall rating level.
Args:
differences: List of difference percentages from intervals
threshold_low: The low threshold percentage for LOW rating
threshold_high: The high threshold percentage for HIGH rating
Returns:
Tuple of (rating_level, average_difference)
rating_level: "LOW", "NORMAL", or "HIGH"
average_difference: The averaged difference percentage
"""
if not differences:
return PRICE_RATING_NORMAL, 0.0
# Filter out None values
valid_differences = [d for d in differences if d is not None]
if not valid_differences:
return PRICE_RATING_NORMAL, 0.0
# Calculate average difference
avg_difference = sum(valid_differences) / len(valid_differences)
# Apply thresholds
rating_level = calculate_rating_level(avg_difference, threshold_low, threshold_high)
return rating_level or PRICE_RATING_NORMAL, avg_difference
def aggregate_period_levels(interval_data_list: list[dict[str, Any]]) -> str | None:
"""
Aggregate price levels across multiple intervals in a period.
Extracts "level" from each interval and uses the same logic as
aggregate_price_levels() to determine the overall level for the period.
Args:
interval_data_list: List of price interval dictionaries with "level" keys
Returns:
The aggregated level string in lowercase (e.g., "very_cheap", "normal", "expensive"),
or None if no valid levels found
"""
levels: list[str] = []
for interval in interval_data_list:
level = interval.get("level")
if level is not None and isinstance(level, str):
levels.append(level)
if not levels:
return None
aggregated = aggregate_price_levels(levels)
# Convert to lowercase for consistency with other enum sensors
return aggregated.lower() if aggregated else None
def aggregate_period_ratings(
interval_data_list: list[dict[str, Any]],
threshold_low: float,
threshold_high: float,
) -> tuple[str | None, float | None]:
"""
Aggregate price ratings across multiple intervals in a period.
Extracts "difference" from each interval and uses the same logic as
aggregate_price_rating() to determine the overall rating for the period.
Args:
interval_data_list: List of price interval dictionaries with "difference" keys
threshold_low: The low threshold percentage for LOW rating
threshold_high: The high threshold percentage for HIGH rating
Returns:
Tuple of (rating_level, average_difference)
rating_level: "low", "normal", "high" (lowercase), or None if no valid data
average_difference: The averaged difference percentage, or None if no valid data
"""
differences: list[float] = []
for interval in interval_data_list:
diff = interval.get("difference")
if diff is not None:
differences.append(float(diff))
if not differences:
return None, None
rating_level, avg_diff = aggregate_price_rating(differences, threshold_low, threshold_high)
# Convert to lowercase for consistency with other enum sensors
return rating_level.lower() if rating_level else None, avg_diff
def calculate_price_trend(
current_interval_price: float,
future_average: float,
threshold_rising: float = 5.0,
threshold_falling: float = -5.0,
) -> tuple[str, float]:
"""
Calculate price trend by comparing current price with future average.
Args:
current_interval_price: Current interval price
future_average: Average price of future intervals
threshold_rising: Percentage threshold for rising trend (positive, default 5%)
threshold_falling: Percentage threshold for falling trend (negative, default -5%)
Returns:
Tuple of (trend_state, difference_percentage)
trend_state: "rising" | "falling" | "stable"
difference_percentage: % change from current to future ((future - current) / current * 100)
"""
if current_interval_price == 0:
# Avoid division by zero
return "stable", 0.0
# Calculate percentage difference from current to future
diff_pct = ((future_average - current_interval_price) / current_interval_price) * 100
# Determine trend based on thresholds
# threshold_falling is negative, so we compare with it directly
if diff_pct > threshold_rising:
trend = "rising"
elif diff_pct < threshold_falling:
trend = "falling"
else:
trend = "stable"
return trend, diff_pct