"""Utility functions for calculating price averages.""" from __future__ import annotations from datetime import datetime, timedelta from homeassistant.util import dt as dt_util def calculate_trailing_24h_avg(all_prices: list[dict], interval_start: datetime) -> float: """ Calculate trailing 24-hour average price for a given interval. Args: all_prices: List of all price data (yesterday, today, tomorrow combined) interval_start: Start time of the interval to calculate average for Returns: Average price for the 24 hours preceding the interval (not including the interval itself) """ # Define the 24-hour window: from 24 hours before interval_start up to interval_start window_start = interval_start - timedelta(hours=24) window_end = interval_start # Filter prices within the 24-hour window prices_in_window = [] for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) # Include intervals that start within the window (not including the current interval's end) if window_start <= starts_at < window_end: prices_in_window.append(float(price_data["total"])) # Calculate average if prices_in_window: return sum(prices_in_window) / len(prices_in_window) return 0.0 def calculate_leading_24h_avg(all_prices: list[dict], interval_start: datetime) -> float: """ Calculate leading 24-hour average price for a given interval. Args: all_prices: List of all price data (yesterday, today, tomorrow combined) interval_start: Start time of the interval to calculate average for Returns: Average price for up to 24 hours following the interval (including the interval itself) """ # Define the 24-hour window: from interval_start up to 24 hours after window_start = interval_start window_end = interval_start + timedelta(hours=24) # Filter prices within the 24-hour window prices_in_window = [] for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) # Include intervals that start within the window if window_start <= starts_at < window_end: prices_in_window.append(float(price_data["total"])) # Calculate average if prices_in_window: return sum(prices_in_window) / len(prices_in_window) return 0.0 def calculate_current_trailing_avg(coordinator_data: dict) -> float | None: """ Calculate the trailing 24-hour average for the current time. Args: coordinator_data: The coordinator data containing priceInfo Returns: Current trailing 24-hour average price, or None if unavailable """ if not coordinator_data: return None price_info = coordinator_data.get("priceInfo", {}) yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices if not all_prices: return None now = dt_util.now() return calculate_trailing_24h_avg(all_prices, now) def calculate_current_leading_avg(coordinator_data: dict) -> float | None: """ Calculate the leading 24-hour average for the current time. Args: coordinator_data: The coordinator data containing priceInfo Returns: Current leading 24-hour average price, or None if unavailable """ if not coordinator_data: return None price_info = coordinator_data.get("priceInfo", {}) yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices if not all_prices: return None now = dt_util.now() return calculate_leading_24h_avg(all_prices, now) def calculate_trailing_24h_min(all_prices: list[dict], interval_start: datetime) -> float: """ Calculate trailing 24-hour minimum price for a given interval. Args: all_prices: List of all price data (yesterday, today, tomorrow combined) interval_start: Start time of the interval to calculate minimum for Returns: Minimum price for the 24 hours preceding the interval (not including the interval itself) """ # Define the 24-hour window: from 24 hours before interval_start up to interval_start window_start = interval_start - timedelta(hours=24) window_end = interval_start # Filter prices within the 24-hour window prices_in_window = [] for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) # Include intervals that start within the window (not including the current interval's end) if window_start <= starts_at < window_end: prices_in_window.append(float(price_data["total"])) # Calculate minimum if prices_in_window: return min(prices_in_window) return 0.0 def calculate_trailing_24h_max(all_prices: list[dict], interval_start: datetime) -> float: """ Calculate trailing 24-hour maximum price for a given interval. Args: all_prices: List of all price data (yesterday, today, tomorrow combined) interval_start: Start time of the interval to calculate maximum for Returns: Maximum price for the 24 hours preceding the interval (not including the interval itself) """ # Define the 24-hour window: from 24 hours before interval_start up to interval_start window_start = interval_start - timedelta(hours=24) window_end = interval_start # Filter prices within the 24-hour window prices_in_window = [] for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) # Include intervals that start within the window (not including the current interval's end) if window_start <= starts_at < window_end: prices_in_window.append(float(price_data["total"])) # Calculate maximum if prices_in_window: return max(prices_in_window) return 0.0 def calculate_leading_24h_min(all_prices: list[dict], interval_start: datetime) -> float: """ Calculate leading 24-hour minimum price for a given interval. Args: all_prices: List of all price data (yesterday, today, tomorrow combined) interval_start: Start time of the interval to calculate minimum for Returns: Minimum price for up to 24 hours following the interval (including the interval itself) """ # Define the 24-hour window: from interval_start up to 24 hours after window_start = interval_start window_end = interval_start + timedelta(hours=24) # Filter prices within the 24-hour window prices_in_window = [] for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) # Include intervals that start within the window if window_start <= starts_at < window_end: prices_in_window.append(float(price_data["total"])) # Calculate minimum if prices_in_window: return min(prices_in_window) return 0.0 def calculate_leading_24h_max(all_prices: list[dict], interval_start: datetime) -> float: """ Calculate leading 24-hour maximum price for a given interval. Args: all_prices: List of all price data (yesterday, today, tomorrow combined) interval_start: Start time of the interval to calculate maximum for Returns: Maximum price for up to 24 hours following the interval (including the interval itself) """ # Define the 24-hour window: from interval_start up to 24 hours after window_start = interval_start window_end = interval_start + timedelta(hours=24) # Filter prices within the 24-hour window prices_in_window = [] for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) # Include intervals that start within the window if window_start <= starts_at < window_end: prices_in_window.append(float(price_data["total"])) # Calculate maximum if prices_in_window: return max(prices_in_window) return 0.0 def calculate_current_trailing_min(coordinator_data: dict) -> float | None: """ Calculate the trailing 24-hour minimum for the current time. Args: coordinator_data: The coordinator data containing priceInfo Returns: Current trailing 24-hour minimum price, or None if unavailable """ if not coordinator_data: return None price_info = coordinator_data.get("priceInfo", {}) yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices if not all_prices: return None now = dt_util.now() return calculate_trailing_24h_min(all_prices, now) def calculate_current_trailing_max(coordinator_data: dict) -> float | None: """ Calculate the trailing 24-hour maximum for the current time. Args: coordinator_data: The coordinator data containing priceInfo Returns: Current trailing 24-hour maximum price, or None if unavailable """ if not coordinator_data: return None price_info = coordinator_data.get("priceInfo", {}) yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices if not all_prices: return None now = dt_util.now() return calculate_trailing_24h_max(all_prices, now) def calculate_current_leading_min(coordinator_data: dict) -> float | None: """ Calculate the leading 24-hour minimum for the current time. Args: coordinator_data: The coordinator data containing priceInfo Returns: Current leading 24-hour minimum price, or None if unavailable """ if not coordinator_data: return None price_info = coordinator_data.get("priceInfo", {}) yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices if not all_prices: return None now = dt_util.now() return calculate_leading_24h_min(all_prices, now) def calculate_current_leading_max(coordinator_data: dict) -> float | None: """ Calculate the leading 24-hour maximum for the current time. Args: coordinator_data: The coordinator data containing priceInfo Returns: Current leading 24-hour maximum price, or None if unavailable """ if not coordinator_data: return None price_info = coordinator_data.get("priceInfo", {}) yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices if not all_prices: return None now = dt_util.now() return calculate_leading_24h_max(all_prices, now) def calculate_current_rolling_5interval_avg(coordinator_data: dict) -> float | None: """ Calculate rolling 5-interval average (2 previous + current + 2 next intervals). This provides a smoothed "hour price" that adapts as time moves, rather than being fixed to clock hours. With 15-minute intervals, this covers a 75-minute window (37.5 minutes before and after the current interval). Args: coordinator_data: The coordinator data containing priceInfo Returns: Average price of the 5 intervals, or None if unavailable """ if not coordinator_data: return None price_info = coordinator_data.get("priceInfo", {}) yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices if not all_prices: return None now = dt_util.now() # Find the current interval current_idx = None for idx, price_data in enumerate(all_prices): starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) interval_end = starts_at + timedelta(minutes=15) if starts_at <= now < interval_end: current_idx = idx break if current_idx is None: return None # Collect prices from 2 intervals before to 2 intervals after (5 total) prices_in_window = [] for offset in range(-2, 3): # -2, -1, 0, 1, 2 idx = current_idx + offset if 0 <= idx < len(all_prices): price = all_prices[idx].get("total") if price is not None: prices_in_window.append(float(price)) # Calculate average if prices_in_window: return sum(prices_in_window) / len(prices_in_window) return None def calculate_next_hour_rolling_5interval_avg(coordinator_data: dict) -> float | None: """ Calculate rolling 5-interval average for the next hour (shifted by 4 intervals). This provides the same smoothed "hour price" as the current hour sensor, but looks ahead to the next hour. With 15-minute intervals, this shifts the 5-interval window forward by 60 minutes (4 intervals). Args: coordinator_data: The coordinator data containing priceInfo Returns: Average price of the 5 intervals one hour ahead, or None if unavailable """ if not coordinator_data: return None price_info = coordinator_data.get("priceInfo", {}) yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices if not all_prices: return None now = dt_util.now() # Find the current interval current_idx = None for idx, price_data in enumerate(all_prices): starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) interval_end = starts_at + timedelta(minutes=15) if starts_at <= now < interval_end: current_idx = idx break if current_idx is None: return None # Shift forward by 4 intervals (1 hour) to get the "next hour" center point next_hour_idx = current_idx + 4 # Collect prices from 2 intervals before to 2 intervals after the next hour center (5 total) # This means: current_idx + 2, +3, +4, +5, +6 prices_in_window = [] for offset in range(-2, 3): # -2, -1, 0, 1, 2 relative to next_hour_idx idx = next_hour_idx + offset if 0 <= idx < len(all_prices): price = all_prices[idx].get("total") if price is not None: prices_in_window.append(float(price)) # Calculate average if prices_in_window: return sum(prices_in_window) / len(prices_in_window) return None