mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-30 05:13:40 +00:00
Replaced absolute volatility thresholds (ct/øre) with relative coefficient of variation (CV = std_dev / mean * 100%) for scale-independent volatility measurement that works across all price levels. Changes to volatility calculation: - price_utils.py: Rewrote calculate_volatility_level() to accept price list instead of spread value, using statistics.mean() and statistics.stdev() - sensor.py: Updated volatility sensors to pass price lists (not spread) - services.py: Modified _get_price_stats() to calculate CV from prices - period_statistics.py: Extract prices for CV calculation in period summaries - const.py: Updated default thresholds to 15%/30%/50% (was 5/15/30 ct) with comprehensive documentation explaining CV-based approach Dead code removal: - period_utils/core.py: Removed filter_periods_by_volatility() function (86 lines of code that was never actually called) - period_utils/__init__.py: Removed dead function export - period_utils/relaxation.py: Simplified callback signature from Callable[[str|None, str|None], bool] to Callable[[str|None], bool] - coordinator.py: Updated lambda callbacks to match new signature - const.py: Replaced RELAXATION_VOLATILITY_ANY with RELAXATION_LEVEL_ANY Bug fix: - relaxation.py: Added int() conversion for max_relaxation_attempts (line 435: attempts = max(1, int(max_relaxation_attempts))) Fixes TypeError when config value arrives as float Configuration UI: - config_flow.py: Changed volatility threshold unit display from "ct" to "%" Translations (all 5 languages): - Updated volatility descriptions to explain coefficient of variation - Changed threshold labels from "spread ≥ value" to "CV ≥ percentage" - Languages: de, en, nb, nl, sv Documentation: - period-calculation.md: Removed volatility filter section (dead feature) Impact: Breaking change for users with custom volatility thresholds. Old absolute values (e.g., 5 ct) will be interpreted as percentages (5%). However, new defaults (15%/30%/50%) are more conservative and work universally across all currencies and price levels. No data migration needed - existing configs continue to work with new interpretation.
525 lines
18 KiB
Python
525 lines
18 KiB
Python
"""Utility functions for price data calculations."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import statistics
|
|
from datetime import datetime, timedelta
|
|
from typing import Any
|
|
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .const import (
|
|
DEFAULT_VOLATILITY_THRESHOLD_HIGH,
|
|
DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
|
|
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
|
|
PRICE_LEVEL_MAPPING,
|
|
PRICE_LEVEL_NORMAL,
|
|
PRICE_RATING_NORMAL,
|
|
VOLATILITY_HIGH,
|
|
VOLATILITY_LOW,
|
|
VOLATILITY_MODERATE,
|
|
VOLATILITY_VERY_HIGH,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
MINUTES_PER_INTERVAL = 15
|
|
MIN_PRICES_FOR_VOLATILITY = 2 # Minimum number of price values needed for volatility calculation
|
|
|
|
|
|
def calculate_volatility_level(
|
|
prices: list[float],
|
|
threshold_moderate: float | None = None,
|
|
threshold_high: float | None = None,
|
|
threshold_very_high: float | None = None,
|
|
) -> str:
|
|
"""
|
|
Calculate volatility level from price list using coefficient of variation.
|
|
|
|
Volatility indicates how much prices fluctuate during a period, which helps
|
|
determine whether active load shifting is worthwhile. Uses the coefficient
|
|
of variation (CV = std_dev / mean * 100%) for relative comparison that works
|
|
across different price levels and period lengths.
|
|
|
|
Args:
|
|
prices: List of price values (in any unit, typically major currency units like EUR or NOK)
|
|
threshold_moderate: Custom threshold for MODERATE level (default: use DEFAULT_VOLATILITY_THRESHOLD_MODERATE)
|
|
threshold_high: Custom threshold for HIGH level (default: use DEFAULT_VOLATILITY_THRESHOLD_HIGH)
|
|
threshold_very_high: Custom threshold for VERY_HIGH level (default: use DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH)
|
|
|
|
Returns:
|
|
Volatility level: "LOW", "MODERATE", "HIGH", or "VERY_HIGH" (uppercase)
|
|
|
|
Examples:
|
|
- CV < 15%: LOW → minimal optimization potential, prices relatively stable
|
|
- 15% ≤ CV < 30%: MODERATE → some optimization worthwhile, noticeable variation
|
|
- 30% ≤ CV < 50%: HIGH → strong optimization recommended, significant swings
|
|
- CV ≥ 50%: VERY_HIGH → maximum optimization potential, extreme volatility
|
|
|
|
Note:
|
|
Requires at least 2 price values for calculation. Returns LOW if insufficient data.
|
|
Works identically for short periods (2-3 intervals) and long periods (96 intervals/day).
|
|
|
|
"""
|
|
# Need at least 2 values for standard deviation
|
|
if len(prices) < MIN_PRICES_FOR_VOLATILITY:
|
|
return VOLATILITY_LOW
|
|
|
|
# Use provided thresholds or fall back to constants
|
|
t_moderate = threshold_moderate if threshold_moderate is not None else DEFAULT_VOLATILITY_THRESHOLD_MODERATE
|
|
t_high = threshold_high if threshold_high is not None else DEFAULT_VOLATILITY_THRESHOLD_HIGH
|
|
t_very_high = threshold_very_high if threshold_very_high is not None else DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH
|
|
|
|
# Calculate coefficient of variation
|
|
mean = statistics.mean(prices)
|
|
if mean <= 0:
|
|
# Avoid division by zero or negative mean (shouldn't happen with prices)
|
|
return VOLATILITY_LOW
|
|
|
|
std_dev = statistics.stdev(prices)
|
|
coefficient_of_variation = (std_dev / mean) * 100 # As percentage
|
|
|
|
# Classify based on thresholds
|
|
if coefficient_of_variation < t_moderate:
|
|
return VOLATILITY_LOW
|
|
if coefficient_of_variation < t_high:
|
|
return VOLATILITY_MODERATE
|
|
if coefficient_of_variation < t_very_high:
|
|
return VOLATILITY_HIGH
|
|
return VOLATILITY_VERY_HIGH
|
|
|
|
|
|
def calculate_trailing_average_for_interval(
|
|
interval_start: datetime,
|
|
all_prices: list[dict[str, Any]],
|
|
) -> float | None:
|
|
"""
|
|
Calculate the trailing 24-hour average price for a specific interval.
|
|
|
|
Args:
|
|
interval_start: The start time of the interval we're calculating for
|
|
all_prices: List of all available price intervals (yesterday + today + tomorrow)
|
|
|
|
Returns:
|
|
The average price of all intervals in the 24 hours before interval_start,
|
|
or None if insufficient data is available.
|
|
|
|
"""
|
|
if not all_prices:
|
|
return None
|
|
|
|
# Calculate the lookback period: 24 hours before this interval
|
|
lookback_start = interval_start - timedelta(hours=24)
|
|
|
|
# Collect all prices that fall within the 24-hour lookback window
|
|
matching_prices = []
|
|
|
|
for price_data in all_prices:
|
|
starts_at_str = price_data.get("startsAt")
|
|
if not starts_at_str:
|
|
continue
|
|
|
|
# Parse the timestamp
|
|
price_time = dt_util.parse_datetime(starts_at_str)
|
|
if price_time is None:
|
|
continue
|
|
|
|
# Convert to local timezone for comparison
|
|
price_time = dt_util.as_local(price_time)
|
|
|
|
# Check if this price falls within our lookback window
|
|
# Include prices that start >= lookback_start and start < interval_start
|
|
if lookback_start <= price_time < interval_start:
|
|
total_price = price_data.get("total")
|
|
if total_price is not None:
|
|
matching_prices.append(float(total_price))
|
|
|
|
if not matching_prices:
|
|
_LOGGER.debug(
|
|
"No prices found in 24-hour lookback window for interval starting at %s (lookback: %s to %s)",
|
|
interval_start,
|
|
lookback_start,
|
|
interval_start,
|
|
)
|
|
return None
|
|
|
|
# Calculate and return the average
|
|
return sum(matching_prices) / len(matching_prices)
|
|
|
|
|
|
def calculate_difference_percentage(
|
|
current_price: float,
|
|
trailing_average: float | None,
|
|
) -> float | None:
|
|
"""
|
|
Calculate the difference percentage between current price and trailing average.
|
|
|
|
This mimics the API's "difference" field from priceRating endpoint.
|
|
|
|
Args:
|
|
current_price: The current interval's price
|
|
trailing_average: The 24-hour trailing average price
|
|
|
|
Returns:
|
|
The percentage difference: ((current - average) / average) * 100
|
|
or None if trailing_average is None or zero.
|
|
|
|
"""
|
|
if trailing_average is None or trailing_average == 0:
|
|
return None
|
|
|
|
return ((current_price - trailing_average) / trailing_average) * 100
|
|
|
|
|
|
def calculate_rating_level(
|
|
difference: float | None,
|
|
threshold_low: float,
|
|
threshold_high: float,
|
|
) -> str | None:
|
|
"""
|
|
Calculate the rating level based on difference percentage and thresholds.
|
|
|
|
This mimics the API's "level" field from priceRating endpoint.
|
|
|
|
Args:
|
|
difference: The difference percentage (from calculate_difference_percentage)
|
|
threshold_low: The low threshold percentage (typically -100 to 0)
|
|
threshold_high: The high threshold percentage (typically 0 to 100)
|
|
|
|
Returns:
|
|
"LOW" if difference <= threshold_low
|
|
"HIGH" if difference >= threshold_high
|
|
"NORMAL" otherwise
|
|
None if difference is None
|
|
|
|
"""
|
|
if difference is None:
|
|
return None
|
|
|
|
# If difference falls in both ranges (shouldn't normally happen), return NORMAL
|
|
if difference <= threshold_low and difference >= threshold_high:
|
|
return PRICE_RATING_NORMAL
|
|
|
|
# Classify based on thresholds
|
|
if difference <= threshold_low:
|
|
return "LOW"
|
|
|
|
if difference >= threshold_high:
|
|
return "HIGH"
|
|
|
|
return PRICE_RATING_NORMAL
|
|
|
|
|
|
def _process_price_interval(
|
|
price_interval: dict[str, Any],
|
|
all_prices: list[dict[str, Any]],
|
|
threshold_low: float,
|
|
threshold_high: float,
|
|
day_label: str,
|
|
) -> None:
|
|
"""
|
|
Process a single price interval and add difference and rating_level.
|
|
|
|
Args:
|
|
price_interval: The price interval to process (modified in place)
|
|
all_prices: All available price intervals for lookback calculation
|
|
threshold_low: Low threshold percentage
|
|
threshold_high: High threshold percentage
|
|
day_label: Label for logging ("today" or "tomorrow")
|
|
|
|
"""
|
|
starts_at_str = price_interval.get("startsAt")
|
|
if not starts_at_str:
|
|
return
|
|
|
|
starts_at = dt_util.parse_datetime(starts_at_str)
|
|
if starts_at is None:
|
|
return
|
|
|
|
starts_at = dt_util.as_local(starts_at)
|
|
current_price = price_interval.get("total")
|
|
|
|
if current_price is None:
|
|
return
|
|
|
|
# Calculate trailing average
|
|
trailing_avg = calculate_trailing_average_for_interval(starts_at, all_prices)
|
|
|
|
# Calculate and set the difference and rating_level
|
|
if trailing_avg is not None:
|
|
difference = calculate_difference_percentage(float(current_price), trailing_avg)
|
|
price_interval["difference"] = difference
|
|
|
|
# Calculate rating_level based on difference
|
|
rating_level = calculate_rating_level(difference, threshold_low, threshold_high)
|
|
price_interval["rating_level"] = rating_level
|
|
else:
|
|
# Set to None if we couldn't calculate
|
|
price_interval["difference"] = None
|
|
price_interval["rating_level"] = None
|
|
_LOGGER.debug(
|
|
"Could not calculate trailing average for %s interval %s",
|
|
day_label,
|
|
starts_at,
|
|
)
|
|
|
|
|
|
def enrich_price_info_with_differences(
|
|
price_info: dict[str, Any],
|
|
threshold_low: float | None = None,
|
|
threshold_high: float | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Enrich price info with calculated 'difference' and 'rating_level' values.
|
|
|
|
Computes the trailing 24-hour average, difference percentage, and rating level
|
|
for each interval in today and tomorrow (excluding yesterday since it's historical).
|
|
|
|
Args:
|
|
price_info: Dictionary with 'yesterday', 'today', 'tomorrow' keys
|
|
threshold_low: Low threshold percentage for rating_level (defaults to -10)
|
|
threshold_high: High threshold percentage for rating_level (defaults to 10)
|
|
|
|
Returns:
|
|
Updated price_info dict with 'difference' and 'rating_level' added
|
|
|
|
"""
|
|
if threshold_low is None:
|
|
threshold_low = -10
|
|
if threshold_high is None:
|
|
threshold_high = 10
|
|
|
|
yesterday_prices = price_info.get("yesterday", [])
|
|
today_prices = price_info.get("today", [])
|
|
tomorrow_prices = price_info.get("tomorrow", [])
|
|
|
|
# Combine all prices for lookback calculation
|
|
all_prices = yesterday_prices + today_prices + tomorrow_prices
|
|
|
|
_LOGGER.debug(
|
|
"Enriching price info with differences and rating levels: "
|
|
"yesterday=%d, today=%d, tomorrow=%d, thresholds: low=%.2f, high=%.2f",
|
|
len(yesterday_prices),
|
|
len(today_prices),
|
|
len(tomorrow_prices),
|
|
threshold_low,
|
|
threshold_high,
|
|
)
|
|
|
|
# Process today's prices
|
|
for price_interval in today_prices:
|
|
_process_price_interval(price_interval, all_prices, threshold_low, threshold_high, "today")
|
|
|
|
# Process tomorrow's prices
|
|
for price_interval in tomorrow_prices:
|
|
_process_price_interval(price_interval, all_prices, threshold_low, threshold_high, "tomorrow")
|
|
|
|
return price_info
|
|
|
|
|
|
def find_price_data_for_interval(price_info: Any, target_time: datetime) -> dict | None:
|
|
"""
|
|
Find the price data for a specific 15-minute interval timestamp.
|
|
|
|
Args:
|
|
price_info: The price info dictionary from Tibber API
|
|
target_time: The target timestamp to find price data for
|
|
|
|
Returns:
|
|
Price data dict if found, None otherwise
|
|
|
|
"""
|
|
day_key = "tomorrow" if target_time.date() > dt_util.now().date() else "today"
|
|
search_days = [day_key, "tomorrow" if day_key == "today" else "today"]
|
|
|
|
for search_day in search_days:
|
|
day_prices = price_info.get(search_day, [])
|
|
if not day_prices:
|
|
continue
|
|
|
|
for price_data in day_prices:
|
|
starts_at = dt_util.parse_datetime(price_data["startsAt"])
|
|
if starts_at is None:
|
|
continue
|
|
|
|
starts_at = dt_util.as_local(starts_at)
|
|
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
|
|
if starts_at <= target_time < interval_end and starts_at.date() == target_time.date():
|
|
return price_data
|
|
|
|
return None
|
|
|
|
|
|
def aggregate_price_levels(levels: list[str]) -> str:
|
|
"""
|
|
Aggregate multiple price levels into a single representative level using median.
|
|
|
|
Takes a list of price level strings (e.g., "VERY_CHEAP", "NORMAL", "EXPENSIVE")
|
|
and returns the median level after sorting by numeric values. This naturally
|
|
tends toward "NORMAL" when levels are mixed.
|
|
|
|
Args:
|
|
levels: List of price level strings from intervals
|
|
|
|
Returns:
|
|
The median price level string, or PRICE_LEVEL_NORMAL if input is empty
|
|
|
|
"""
|
|
if not levels:
|
|
return PRICE_LEVEL_NORMAL
|
|
|
|
# Convert levels to numeric values and sort
|
|
numeric_values = [PRICE_LEVEL_MAPPING.get(level, 0) for level in levels]
|
|
numeric_values.sort()
|
|
|
|
# Get median (middle value for odd length, lower-middle for even length)
|
|
median_idx = len(numeric_values) // 2
|
|
median_value = numeric_values[median_idx]
|
|
|
|
# Convert back to level string
|
|
for level, value in PRICE_LEVEL_MAPPING.items():
|
|
if value == median_value:
|
|
return level
|
|
|
|
return PRICE_LEVEL_NORMAL
|
|
|
|
|
|
def aggregate_price_rating(differences: list[float], threshold_low: float, threshold_high: float) -> tuple[str, float]:
|
|
"""
|
|
Aggregate multiple price differences into a single rating level.
|
|
|
|
Calculates the average difference percentage across multiple intervals
|
|
and applies thresholds to determine the overall rating level.
|
|
|
|
Args:
|
|
differences: List of difference percentages from intervals
|
|
threshold_low: The low threshold percentage for LOW rating
|
|
threshold_high: The high threshold percentage for HIGH rating
|
|
|
|
Returns:
|
|
Tuple of (rating_level, average_difference)
|
|
rating_level: "LOW", "NORMAL", or "HIGH"
|
|
average_difference: The averaged difference percentage
|
|
|
|
"""
|
|
if not differences:
|
|
return PRICE_RATING_NORMAL, 0.0
|
|
|
|
# Filter out None values
|
|
valid_differences = [d for d in differences if d is not None]
|
|
if not valid_differences:
|
|
return PRICE_RATING_NORMAL, 0.0
|
|
|
|
# Calculate average difference
|
|
avg_difference = sum(valid_differences) / len(valid_differences)
|
|
|
|
# Apply thresholds
|
|
rating_level = calculate_rating_level(avg_difference, threshold_low, threshold_high)
|
|
|
|
return rating_level or PRICE_RATING_NORMAL, avg_difference
|
|
|
|
|
|
def aggregate_period_levels(interval_data_list: list[dict[str, Any]]) -> str | None:
|
|
"""
|
|
Aggregate price levels across multiple intervals in a period.
|
|
|
|
Extracts "level" from each interval and uses the same logic as
|
|
aggregate_price_levels() to determine the overall level for the period.
|
|
|
|
Args:
|
|
interval_data_list: List of price interval dictionaries with "level" keys
|
|
|
|
Returns:
|
|
The aggregated level string in lowercase (e.g., "very_cheap", "normal", "expensive"),
|
|
or None if no valid levels found
|
|
|
|
"""
|
|
levels: list[str] = []
|
|
for interval in interval_data_list:
|
|
level = interval.get("level")
|
|
if level is not None and isinstance(level, str):
|
|
levels.append(level)
|
|
|
|
if not levels:
|
|
return None
|
|
|
|
aggregated = aggregate_price_levels(levels)
|
|
# Convert to lowercase for consistency with other enum sensors
|
|
return aggregated.lower() if aggregated else None
|
|
|
|
|
|
def aggregate_period_ratings(
|
|
interval_data_list: list[dict[str, Any]],
|
|
threshold_low: float,
|
|
threshold_high: float,
|
|
) -> tuple[str | None, float | None]:
|
|
"""
|
|
Aggregate price ratings across multiple intervals in a period.
|
|
|
|
Extracts "difference" from each interval and uses the same logic as
|
|
aggregate_price_rating() to determine the overall rating for the period.
|
|
|
|
Args:
|
|
interval_data_list: List of price interval dictionaries with "difference" keys
|
|
threshold_low: The low threshold percentage for LOW rating
|
|
threshold_high: The high threshold percentage for HIGH rating
|
|
|
|
Returns:
|
|
Tuple of (rating_level, average_difference)
|
|
rating_level: "low", "normal", "high" (lowercase), or None if no valid data
|
|
average_difference: The averaged difference percentage, or None if no valid data
|
|
|
|
"""
|
|
differences: list[float] = []
|
|
for interval in interval_data_list:
|
|
diff = interval.get("difference")
|
|
if diff is not None:
|
|
differences.append(float(diff))
|
|
|
|
if not differences:
|
|
return None, None
|
|
|
|
rating_level, avg_diff = aggregate_price_rating(differences, threshold_low, threshold_high)
|
|
# Convert to lowercase for consistency with other enum sensors
|
|
return rating_level.lower() if rating_level else None, avg_diff
|
|
|
|
|
|
def calculate_price_trend(
|
|
current_price: float,
|
|
future_average: float,
|
|
threshold_rising: float = 5.0,
|
|
threshold_falling: float = -5.0,
|
|
) -> tuple[str, float]:
|
|
"""
|
|
Calculate price trend by comparing current price with future average.
|
|
|
|
Args:
|
|
current_price: Current interval price
|
|
future_average: Average price of future intervals
|
|
threshold_rising: Percentage threshold for rising trend (positive, default 5%)
|
|
threshold_falling: Percentage threshold for falling trend (negative, default -5%)
|
|
|
|
Returns:
|
|
Tuple of (trend_state, difference_percentage)
|
|
trend_state: "rising" | "falling" | "stable"
|
|
difference_percentage: % change from current to future ((future - current) / current * 100)
|
|
|
|
"""
|
|
if current_price == 0:
|
|
# Avoid division by zero
|
|
return "stable", 0.0
|
|
|
|
# Calculate percentage difference from current to future
|
|
diff_pct = ((future_average - current_price) / current_price) * 100
|
|
|
|
# Determine trend based on thresholds
|
|
# threshold_falling is negative, so we compare with it directly
|
|
if diff_pct > threshold_rising:
|
|
trend = "rising"
|
|
elif diff_pct < threshold_falling:
|
|
trend = "falling"
|
|
else:
|
|
trend = "stable"
|
|
|
|
return trend, diff_pct
|