hass.tibber_prices/custom_components/tibber_prices/utils/average.py
Julian Pawlowski 9c3c094305 fix(calculations): handle negative electricity prices correctly
Fixed multiple calculation issues with negative prices (Norway/Germany
renewable surplus scenarios):

Bug #6: Rating threshold validation with dead code
- Added threshold validation (low >= high) with warning
- Returns NORMAL as fallback for misconfigured thresholds

Bug #7: Min/Max functions returning 0.0 instead of None
- Changed default from 0.0 to None when window is empty
- Prevents misinterpretation (0.0 looks like price with negatives)

Bug #9: Period price diff percentage wrong sign with negative reference
- Use abs(ref_price) in percentage calculation
- Correct percentage direction for negative prices

Bug #10: Trend diff percentage wrong sign with negative current price
- Use abs(current_interval_price) in percentage calculation
- Correct trend direction when prices cross zero

Bug #11: later_half_diff calculation failed for negative prices
- Changed condition from `if current_interval_price > 0` to `!= 0`
- Use abs(current_interval_price) for percentage

Changes:
- utils/price.py: Add threshold validation, use abs() in percentages
- utils/average.py: Return None instead of 0.0 for empty windows
- period_statistics.py: Use abs() for reference prices
- trend.py: Use abs() for current prices, fix zero-check condition
- tests: 95+ new tests covering negative/zero/mixed price scenarios

Impact: All calculations work correctly with negative electricity prices.
Percentages show correct direction regardless of sign.
2025-11-22 04:45:23 +00:00

501 lines
16 KiB
Python

"""Utility functions for calculating price averages."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
def calculate_trailing_24h_avg(all_prices: list[dict], interval_start: datetime) -> float | None:
"""
Calculate trailing 24-hour average price for a given interval.
Args:
all_prices: List of all price data (yesterday, today, tomorrow combined)
interval_start: Start time of the interval to calculate average for
time: TibberPricesTimeService instance (required)
Returns:
Average price for the 24 hours preceding the interval, or None if no data in window
"""
# Define the 24-hour window: from 24 hours before interval_start up to interval_start
window_start = interval_start - timedelta(hours=24)
window_end = interval_start
# Filter prices within the 24-hour window
prices_in_window = []
for price_data in all_prices:
starts_at = price_data["startsAt"] # Already datetime object in local timezone
if starts_at is None:
continue
# Include intervals that start within the window (not including the current interval's end)
if window_start <= starts_at < window_end:
prices_in_window.append(float(price_data["total"]))
# Calculate average
# CRITICAL: Return None instead of 0.0 when no data available
# With negative prices, 0.0 could be misinterpreted as a real average value
if prices_in_window:
return sum(prices_in_window) / len(prices_in_window)
return None
def calculate_leading_24h_avg(all_prices: list[dict], interval_start: datetime) -> float | None:
"""
Calculate leading 24-hour average price for a given interval.
Args:
all_prices: List of all price data (yesterday, today, tomorrow combined)
interval_start: Start time of the interval to calculate average for
time: TibberPricesTimeService instance (required)
Returns:
Average price for up to 24 hours following the interval, or None if no data in window
"""
# Define the 24-hour window: from interval_start up to 24 hours after
window_start = interval_start
window_end = interval_start + timedelta(hours=24)
# Filter prices within the 24-hour window
prices_in_window = []
for price_data in all_prices:
starts_at = price_data["startsAt"] # Already datetime object in local timezone
if starts_at is None:
continue
# Include intervals that start within the window
if window_start <= starts_at < window_end:
prices_in_window.append(float(price_data["total"]))
# Calculate average
# CRITICAL: Return None instead of 0.0 when no data available
# With negative prices, 0.0 could be misinterpreted as a real average value
if prices_in_window:
return sum(prices_in_window) / len(prices_in_window)
return None
def calculate_current_trailing_avg(
coordinator_data: dict,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate the trailing 24-hour average for the current time.
Args:
coordinator_data: The coordinator data containing priceInfo
time: TibberPricesTimeService instance (required)
Returns:
Current trailing 24-hour average price, or None if unavailable
"""
if not coordinator_data:
return None
price_info = coordinator_data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
now = time.now()
return calculate_trailing_24h_avg(all_prices, now)
def calculate_current_leading_avg(
coordinator_data: dict,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate the leading 24-hour average for the current time.
Args:
coordinator_data: The coordinator data containing priceInfo
time: TibberPricesTimeService instance (required)
Returns:
Current leading 24-hour average price, or None if unavailable
"""
if not coordinator_data:
return None
price_info = coordinator_data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
now = time.now()
return calculate_leading_24h_avg(all_prices, now)
def calculate_trailing_24h_min(
all_prices: list[dict],
interval_start: datetime,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate trailing 24-hour minimum price for a given interval.
Args:
all_prices: List of all price data (yesterday, today, tomorrow combined)
interval_start: Start time of the interval to calculate minimum for
time: TibberPricesTimeService instance (required)
Returns:
Minimum price for the 24 hours preceding the interval, or None if no data in window
"""
# Define the 24-hour window: from 24 hours before interval_start up to interval_start
window_start = interval_start - timedelta(hours=24)
window_end = interval_start
# Filter prices within the 24-hour window
prices_in_window = []
for price_data in all_prices:
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
# Include intervals that start within the window (not including the current interval's end)
if window_start <= starts_at < window_end:
prices_in_window.append(float(price_data["total"]))
# Calculate minimum
# CRITICAL: Return None instead of 0.0 when no data available
# With negative prices, 0.0 could be misinterpreted as a maximum value
if prices_in_window:
return min(prices_in_window)
return None
def calculate_trailing_24h_max(
all_prices: list[dict],
interval_start: datetime,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate trailing 24-hour maximum price for a given interval.
Args:
all_prices: List of all price data (yesterday, today, tomorrow combined)
interval_start: Start time of the interval to calculate maximum for
time: TibberPricesTimeService instance (required)
Returns:
Maximum price for the 24 hours preceding the interval, or None if no data in window
"""
# Define the 24-hour window: from 24 hours before interval_start up to interval_start
window_start = interval_start - timedelta(hours=24)
window_end = interval_start
# Filter prices within the 24-hour window
prices_in_window = []
for price_data in all_prices:
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
# Include intervals that start within the window (not including the current interval's end)
if window_start <= starts_at < window_end:
prices_in_window.append(float(price_data["total"]))
# Calculate maximum
# CRITICAL: Return None instead of 0.0 when no data available
# With negative prices, 0.0 could be misinterpreted as a real price value
if prices_in_window:
return max(prices_in_window)
return None
def calculate_leading_24h_min(
all_prices: list[dict],
interval_start: datetime,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate leading 24-hour minimum price for a given interval.
Args:
all_prices: List of all price data (yesterday, today, tomorrow combined)
interval_start: Start time of the interval to calculate minimum for
time: TibberPricesTimeService instance (required)
Returns:
Minimum price for up to 24 hours following the interval, or None if no data in window
"""
# Define the 24-hour window: from interval_start up to 24 hours after
window_start = interval_start
window_end = interval_start + timedelta(hours=24)
# Filter prices within the 24-hour window
prices_in_window = []
for price_data in all_prices:
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
# Include intervals that start within the window
if window_start <= starts_at < window_end:
prices_in_window.append(float(price_data["total"]))
# Calculate minimum
# CRITICAL: Return None instead of 0.0 when no data available
# With negative prices, 0.0 could be misinterpreted as a maximum value
if prices_in_window:
return min(prices_in_window)
return None
def calculate_leading_24h_max(
all_prices: list[dict],
interval_start: datetime,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate leading 24-hour maximum price for a given interval.
Args:
all_prices: List of all price data (yesterday, today, tomorrow combined)
interval_start: Start time of the interval to calculate maximum for
time: TibberPricesTimeService instance (required)
Returns:
Maximum price for up to 24 hours following the interval, or None if no data in window
"""
# Define the 24-hour window: from interval_start up to 24 hours after
window_start = interval_start
window_end = interval_start + timedelta(hours=24)
# Filter prices within the 24-hour window
prices_in_window = []
for price_data in all_prices:
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
# Include intervals that start within the window
if window_start <= starts_at < window_end:
prices_in_window.append(float(price_data["total"]))
# Calculate maximum
# CRITICAL: Return None instead of 0.0 when no data available
# With negative prices, 0.0 could be misinterpreted as a real price value
if prices_in_window:
return max(prices_in_window)
return None
def calculate_current_trailing_min(
coordinator_data: dict,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate the trailing 24-hour minimum for the current time.
Args:
coordinator_data: The coordinator data containing priceInfo
time: TibberPricesTimeService instance (required)
Returns:
Current trailing 24-hour minimum price, or None if unavailable
"""
if not coordinator_data:
return None
price_info = coordinator_data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
now = time.now()
return calculate_trailing_24h_min(all_prices, now, time=time)
def calculate_current_trailing_max(
coordinator_data: dict,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate the trailing 24-hour maximum for the current time.
Args:
coordinator_data: The coordinator data containing priceInfo
time: TibberPricesTimeService instance (required)
Returns:
Current trailing 24-hour maximum price, or None if unavailable
"""
if not coordinator_data:
return None
price_info = coordinator_data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
now = time.now()
return calculate_trailing_24h_max(all_prices, now, time=time)
def calculate_current_leading_min(
coordinator_data: dict,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate the leading 24-hour minimum for the current time.
Args:
coordinator_data: The coordinator data containing priceInfo
time: TibberPricesTimeService instance (required)
Returns:
Current leading 24-hour minimum price, or None if unavailable
"""
if not coordinator_data:
return None
price_info = coordinator_data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
now = time.now()
return calculate_leading_24h_min(all_prices, now, time=time)
def calculate_current_leading_max(
coordinator_data: dict,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate the leading 24-hour maximum for the current time.
Args:
coordinator_data: The coordinator data containing priceInfo
time: TibberPricesTimeService instance (required)
Returns:
Current leading 24-hour maximum price, or None if unavailable
"""
if not coordinator_data:
return None
price_info = coordinator_data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
now = time.now()
return calculate_leading_24h_max(all_prices, now, time=time)
def calculate_next_n_hours_avg(
coordinator_data: dict,
hours: int,
*,
time: TibberPricesTimeService,
) -> float | None:
"""
Calculate average price for the next N hours starting from the next interval.
This function computes the average of all 15-minute intervals starting from
the next interval (not current) up to N hours into the future.
Args:
coordinator_data: The coordinator data containing priceInfo
hours: Number of hours to look ahead (1, 2, 3, 4, 5, 6, 8, 12, etc.)
time: TibberPricesTimeService instance (required)
Returns:
Average price for the next N hours, or None if insufficient data
"""
if not coordinator_data or hours <= 0:
return None
price_info = coordinator_data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
# Find the current interval index
current_idx = None
for idx, price_data in enumerate(all_prices):
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
interval_end = starts_at + time.get_interval_duration()
if time.is_current_interval(starts_at, interval_end):
current_idx = idx
break
if current_idx is None:
return None
# Calculate how many intervals are in N hours
intervals_needed = time.minutes_to_intervals(hours * 60)
# Collect prices starting from NEXT interval (current_idx + 1)
prices_in_window = []
for offset in range(1, intervals_needed + 1):
idx = current_idx + offset
if idx >= len(all_prices):
# Not enough future data available
break
price = all_prices[idx].get("total")
if price is not None:
prices_in_window.append(float(price))
# Return None if no data at all
if not prices_in_window:
return None
# Return average (prefer full period, but allow graceful degradation)
return sum(prices_in_window) / len(prices_in_window)