hass.tibber_prices/custom_components/tibber_prices/price_utils.py
2025-11-03 20:55:28 +00:00

345 lines
11 KiB
Python

"""Utility functions for price data calculations."""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from homeassistant.util import dt as dt_util
from .const import PRICE_LEVEL_MAPPING, PRICE_LEVEL_NORMAL, PRICE_RATING_NORMAL
_LOGGER = logging.getLogger(__name__)
MINUTES_PER_INTERVAL = 15
def calculate_trailing_average_for_interval(
interval_start: datetime,
all_prices: list[dict[str, Any]],
) -> float | None:
"""
Calculate the trailing 24-hour average price for a specific interval.
Args:
interval_start: The start time of the interval we're calculating for
all_prices: List of all available price intervals (yesterday + today + tomorrow)
Returns:
The average price of all intervals in the 24 hours before interval_start,
or None if insufficient data is available.
"""
if not all_prices:
return None
# Calculate the lookback period: 24 hours before this interval
lookback_start = interval_start - timedelta(hours=24)
# Collect all prices that fall within the 24-hour lookback window
matching_prices = []
for price_data in all_prices:
starts_at_str = price_data.get("startsAt")
if not starts_at_str:
continue
# Parse the timestamp
price_time = dt_util.parse_datetime(starts_at_str)
if price_time is None:
continue
# Convert to local timezone for comparison
price_time = dt_util.as_local(price_time)
# Check if this price falls within our lookback window
# Include prices that start >= lookback_start and start < interval_start
if lookback_start <= price_time < interval_start:
total_price = price_data.get("total")
if total_price is not None:
matching_prices.append(float(total_price))
if not matching_prices:
_LOGGER.debug(
"No prices found in 24-hour lookback window for interval starting at %s (lookback: %s to %s)",
interval_start,
lookback_start,
interval_start,
)
return None
# Calculate and return the average
return sum(matching_prices) / len(matching_prices)
def calculate_difference_percentage(
current_price: float,
trailing_average: float | None,
) -> float | None:
"""
Calculate the difference percentage between current price and trailing average.
This mimics the API's "difference" field from priceRating endpoint.
Args:
current_price: The current interval's price
trailing_average: The 24-hour trailing average price
Returns:
The percentage difference: ((current - average) / average) * 100
or None if trailing_average is None or zero.
"""
if trailing_average is None or trailing_average == 0:
return None
return ((current_price - trailing_average) / trailing_average) * 100
def calculate_rating_level(
difference: float | None,
threshold_low: float,
threshold_high: float,
) -> str | None:
"""
Calculate the rating level based on difference percentage and thresholds.
This mimics the API's "level" field from priceRating endpoint.
Args:
difference: The difference percentage (from calculate_difference_percentage)
threshold_low: The low threshold percentage (typically -100 to 0)
threshold_high: The high threshold percentage (typically 0 to 100)
Returns:
"LOW" if difference <= threshold_low
"HIGH" if difference >= threshold_high
"NORMAL" otherwise
None if difference is None
"""
if difference is None:
return None
# If difference falls in both ranges (shouldn't normally happen), return NORMAL
if difference <= threshold_low and difference >= threshold_high:
return "NORMAL"
# Classify based on thresholds
if difference <= threshold_low:
return "LOW"
if difference >= threshold_high:
return "HIGH"
return "NORMAL"
def _process_price_interval(
price_interval: dict[str, Any],
all_prices: list[dict[str, Any]],
threshold_low: float,
threshold_high: float,
day_label: str,
) -> None:
"""
Process a single price interval and add difference and rating_level.
Args:
price_interval: The price interval to process (modified in place)
all_prices: All available price intervals for lookback calculation
threshold_low: Low threshold percentage
threshold_high: High threshold percentage
day_label: Label for logging ("today" or "tomorrow")
"""
starts_at_str = price_interval.get("startsAt")
if not starts_at_str:
return
starts_at = dt_util.parse_datetime(starts_at_str)
if starts_at is None:
return
starts_at = dt_util.as_local(starts_at)
current_price = price_interval.get("total")
if current_price is None:
return
# Calculate trailing average
trailing_avg = calculate_trailing_average_for_interval(starts_at, all_prices)
# Calculate and set the difference and rating_level
if trailing_avg is not None:
difference = calculate_difference_percentage(float(current_price), trailing_avg)
price_interval["difference"] = difference
# Calculate rating_level based on difference
rating_level = calculate_rating_level(difference, threshold_low, threshold_high)
price_interval["rating_level"] = rating_level
else:
# Set to None if we couldn't calculate
price_interval["difference"] = None
price_interval["rating_level"] = None
_LOGGER.debug(
"Could not calculate trailing average for %s interval %s",
day_label,
starts_at,
)
def enrich_price_info_with_differences(
price_info: dict[str, Any],
threshold_low: float | None = None,
threshold_high: float | None = None,
) -> dict[str, Any]:
"""
Enrich price info with calculated 'difference' and 'rating_level' values.
Computes the trailing 24-hour average, difference percentage, and rating level
for each interval in today and tomorrow (excluding yesterday since it's historical).
Args:
price_info: Dictionary with 'yesterday', 'today', 'tomorrow' keys
threshold_low: Low threshold percentage for rating_level (defaults to -10)
threshold_high: High threshold percentage for rating_level (defaults to 10)
Returns:
Updated price_info dict with 'difference' and 'rating_level' added
"""
if threshold_low is None:
threshold_low = -10
if threshold_high is None:
threshold_high = 10
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
# Combine all prices for lookback calculation
all_prices = yesterday_prices + today_prices + tomorrow_prices
_LOGGER.debug(
"Enriching price info with differences and rating levels: "
"yesterday=%d, today=%d, tomorrow=%d, thresholds: low=%.2f, high=%.2f",
len(yesterday_prices),
len(today_prices),
len(tomorrow_prices),
threshold_low,
threshold_high,
)
# Process today's prices
for price_interval in today_prices:
_process_price_interval(price_interval, all_prices, threshold_low, threshold_high, "today")
# Process tomorrow's prices
for price_interval in tomorrow_prices:
_process_price_interval(price_interval, all_prices, threshold_low, threshold_high, "tomorrow")
return price_info
def find_price_data_for_interval(price_info: Any, target_time: datetime) -> dict | None:
"""
Find the price data for a specific 15-minute interval timestamp.
Args:
price_info: The price info dictionary from Tibber API
target_time: The target timestamp to find price data for
Returns:
Price data dict if found, None otherwise
"""
day_key = "tomorrow" if target_time.date() > dt_util.now().date() else "today"
search_days = [day_key, "tomorrow" if day_key == "today" else "today"]
for search_day in search_days:
day_prices = price_info.get(search_day, [])
if not day_prices:
continue
for price_data in day_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
if starts_at <= target_time < interval_end and starts_at.date() == target_time.date():
return price_data
return None
def aggregate_price_levels(levels: list[str]) -> str:
"""
Aggregate multiple price levels into a single representative level using median.
Takes a list of price level strings (e.g., "VERY_CHEAP", "NORMAL", "EXPENSIVE")
and returns the median level after sorting by numeric values. This naturally
tends toward "NORMAL" when levels are mixed.
Args:
levels: List of price level strings from intervals
Returns:
The median price level string, or "NORMAL" if input is empty
"""
if not levels:
return PRICE_LEVEL_NORMAL
# Convert levels to numeric values and sort
numeric_values = [PRICE_LEVEL_MAPPING.get(level, 0) for level in levels]
numeric_values.sort()
# Get median (middle value for odd length, lower-middle for even length)
median_idx = len(numeric_values) // 2
median_value = numeric_values[median_idx]
# Convert back to level string
for level, value in PRICE_LEVEL_MAPPING.items():
if value == median_value:
return level
return PRICE_LEVEL_NORMAL
def aggregate_price_rating(differences: list[float], threshold_low: float, threshold_high: float) -> tuple[str, float]:
"""
Aggregate multiple price differences into a single rating level.
Calculates the average difference percentage across multiple intervals
and applies thresholds to determine the overall rating level.
Args:
differences: List of difference percentages from intervals
threshold_low: The low threshold percentage for LOW rating
threshold_high: The high threshold percentage for HIGH rating
Returns:
Tuple of (rating_level, average_difference)
rating_level: "LOW", "NORMAL", or "HIGH"
average_difference: The averaged difference percentage
"""
if not differences:
return PRICE_RATING_NORMAL, 0.0
# Filter out None values
valid_differences = [d for d in differences if d is not None]
if not valid_differences:
return PRICE_RATING_NORMAL, 0.0
# Calculate average difference
avg_difference = sum(valid_differences) / len(valid_differences)
# Apply thresholds
rating_level = calculate_rating_level(avg_difference, threshold_low, threshold_high)
return rating_level or PRICE_RATING_NORMAL, avg_difference