From f9f4908748802c38191814f8194f3ceb630e35bf Mon Sep 17 00:00:00 2001 From: Julian Pawlowski Date: Sat, 8 Nov 2025 15:01:25 +0000 Subject: [PATCH] refactor: Enhance period calculations with aggregated levels and ratings --- .../tibber_prices/binary_sensor.py | 94 +++++++++++++--- .../tibber_prices/coordinator.py | 24 ++++- .../tibber_prices/period_utils.py | 102 +++++++++++++++--- .../tibber_prices/price_utils.py | 65 +++++++++++ 4 files changed, 253 insertions(+), 32 deletions(-) diff --git a/custom_components/tibber_prices/binary_sensor.py b/custom_components/tibber_prices/binary_sensor.py index ef15eb1..c165dc8 100644 --- a/custom_components/tibber_prices/binary_sensor.py +++ b/custom_components/tibber_prices/binary_sensor.py @@ -347,8 +347,8 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): # Find current or next interval current_interval = self._find_current_or_next_interval(intervals) - # Build periods summary - periods_summary = self._build_periods_summary(intervals) + # Build periods summary (merge with original summaries to include level/rating_level) + periods_summary = self._build_periods_summary(intervals, period_summaries) # Build final attributes return self._build_final_attributes(current_interval, periods_summary, intervals) @@ -369,16 +369,29 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): return interval.copy() return None - def _build_periods_summary(self, intervals: list[dict]) -> list[dict]: + def _build_periods_summary(self, intervals: list[dict], original_summaries: list[dict]) -> list[dict]: """ Build a summary of periods with consistent attribute structure. Returns a list of period summaries with the same attributes as top-level, making the structure predictable and easy to use in automations. + + Args: + intervals: List of interval dictionaries with period information + original_summaries: Original period summaries from coordinator (with level/rating_level) + """ if not intervals: return [] + # Build a lookup for original summaries by start time + original_lookup: dict[str, dict] = {} + for summary in original_summaries: + start = summary.get("start") + if start: + key = start.isoformat() if hasattr(start, "isoformat") else str(start) + original_lookup[key] = summary + # Group intervals by period (they have the same period_start) periods_dict: dict[str, list[dict]] = {} for interval in intervals: @@ -398,24 +411,41 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): first = period_intervals[0] prices = [i["price"] for i in period_intervals if "price" in i] - # Use same attribute names as top-level for consistency + # Get level and rating_level from original summaries first + aggregated_level = None + aggregated_rating_level = None + period_start = first.get("period_start") + if period_start: + key = period_start.isoformat() if hasattr(period_start, "isoformat") else str(period_start) + original = original_lookup.get(key) + if original: + aggregated_level = original.get("level") + aggregated_rating_level = original.get("rating_level") + + # Optimized attribute order: time → core decisions → prices → details → meta summary = { + # Time information "start": first.get("period_start"), "end": first.get("period_end"), + "duration_minutes": first.get("duration_minutes"), + # Core decision attributes + "level": aggregated_level, + "rating_level": aggregated_rating_level, + # Price statistics + "price_avg": round(sum(prices) / len(prices), 2) if prices else 0, + "price_min": round(min(prices), 2) if prices else 0, + "price_max": round(max(prices), 2) if prices else 0, + # Detail information "hour": first.get("hour"), "minute": first.get("minute"), "time": first.get("time"), - "duration_minutes": first.get("duration_minutes"), "periods_total": first.get("periods_total"), "periods_remaining": first.get("periods_remaining"), "period_position": first.get("period_position"), "intervals_count": len(period_intervals), - "price_avg": round(sum(prices) / len(prices), 2) if prices else 0, - "price_min": round(min(prices), 2) if prices else 0, - "price_max": round(max(prices), 2) if prices else 0, } - # Add price_diff attributes if present + # Add price_diff attributes if present (after details) self._add_price_diff_for_period(summary, period_intervals, first) summaries.append(summary) @@ -449,9 +479,47 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): break if current_period_summary: - # Copy all attributes from the period summary - attributes = {"timestamp": timestamp} - attributes.update(current_period_summary) + # Build attributes with optimized order: time → core decisions → prices → details → meta + attributes = { + # Time information + "timestamp": timestamp, + "start": current_period_summary.get("start"), + "end": current_period_summary.get("end"), + "duration_minutes": current_period_summary.get("duration_minutes"), + # Core decision attributes + "level": current_period_summary.get("level"), + "rating_level": current_period_summary.get("rating_level"), + # Price statistics + "price_avg": current_period_summary.get("price_avg"), + "price_min": current_period_summary.get("price_min"), + "price_max": current_period_summary.get("price_max"), + # Detail information + "hour": current_period_summary.get("hour"), + "minute": current_period_summary.get("minute"), + "time": current_period_summary.get("time"), + "periods_total": current_period_summary.get("periods_total"), + "periods_remaining": current_period_summary.get("periods_remaining"), + "period_position": current_period_summary.get("period_position"), + "intervals_count": current_period_summary.get("intervals_count"), + } + + # Add period price_diff attributes if present + if "period_price_diff_from_daily_min" in current_period_summary: + attributes["period_price_diff_from_daily_min"] = current_period_summary[ + "period_price_diff_from_daily_min" + ] + if "period_price_diff_from_daily_min_%" in current_period_summary: + attributes["period_price_diff_from_daily_min_%"] = current_period_summary[ + "period_price_diff_from_daily_min_%" + ] + elif "period_price_diff_from_daily_max" in current_period_summary: + attributes["period_price_diff_from_daily_max"] = current_period_summary[ + "period_price_diff_from_daily_max" + ] + if "period_price_diff_from_daily_max_%" in current_period_summary: + attributes["period_price_diff_from_daily_max_%"] = current_period_summary[ + "period_price_diff_from_daily_max_%" + ] # Add interval-specific price_diff attributes (separate from period average) # Shows the reference interval's position vs daily min/max: @@ -465,8 +533,8 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): attributes["interval_price_diff_from_daily_max"] = current_interval["price_diff_from_max"] attributes["interval_price_diff_from_daily_max_%"] = current_interval.get("price_diff_from_max_%") + # Meta information at the end attributes["periods"] = periods_summary - attributes["intervals_count"] = len(filtered_result) return attributes # Fallback if current period not found in summary diff --git a/custom_components/tibber_prices/coordinator.py b/custom_components/tibber_prices/coordinator.py index 2d0cc12..fb461f2 100644 --- a/custom_components/tibber_prices/coordinator.py +++ b/custom_components/tibber_prices/coordinator.py @@ -43,7 +43,7 @@ from .const import ( DEFAULT_PRICE_RATING_THRESHOLD_LOW, DOMAIN, ) -from .period_utils import calculate_periods +from .period_utils import PeriodConfig, calculate_periods from .price_utils import ( enrich_price_info_with_differences, find_price_data_for_interval, @@ -738,25 +738,39 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): }, } + # Get rating thresholds from config + threshold_low = self.config_entry.options.get( + CONF_PRICE_RATING_THRESHOLD_LOW, + DEFAULT_PRICE_RATING_THRESHOLD_LOW, + ) + threshold_high = self.config_entry.options.get( + CONF_PRICE_RATING_THRESHOLD_HIGH, + DEFAULT_PRICE_RATING_THRESHOLD_HIGH, + ) + # Calculate best price periods best_config = self._get_period_config(reverse_sort=False) - best_periods = calculate_periods( - all_prices, + best_period_config = PeriodConfig( reverse_sort=False, flex=best_config["flex"], min_distance_from_avg=best_config["min_distance_from_avg"], min_period_length=best_config["min_period_length"], + threshold_low=threshold_low, + threshold_high=threshold_high, ) + best_periods = calculate_periods(all_prices, config=best_period_config) # Calculate peak price periods peak_config = self._get_period_config(reverse_sort=True) - peak_periods = calculate_periods( - all_prices, + peak_period_config = PeriodConfig( reverse_sort=True, flex=peak_config["flex"], min_distance_from_avg=peak_config["min_distance_from_avg"], min_period_length=peak_config["min_period_length"], + threshold_low=threshold_low, + threshold_high=threshold_high, ) + peak_periods = calculate_periods(all_prices, config=peak_period_config) return { "best_price": best_periods, diff --git a/custom_components/tibber_prices/period_utils.py b/custom_components/tibber_prices/period_utils.py index 639768a..0386524 100644 --- a/custom_components/tibber_prices/period_utils.py +++ b/custom_components/tibber_prices/period_utils.py @@ -4,22 +4,33 @@ from __future__ import annotations import logging from datetime import date, timedelta -from typing import Any +from typing import Any, NamedTuple from homeassistant.util import dt as dt_util +from .const import DEFAULT_PRICE_RATING_THRESHOLD_HIGH, DEFAULT_PRICE_RATING_THRESHOLD_LOW +from .price_utils import aggregate_period_levels, aggregate_period_ratings + _LOGGER = logging.getLogger(__name__) MINUTES_PER_INTERVAL = 15 +class PeriodConfig(NamedTuple): + """Configuration for period calculation.""" + + reverse_sort: bool + flex: float + min_distance_from_avg: float + min_period_length: int + threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW + threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH + + def calculate_periods( all_prices: list[dict], *, - reverse_sort: bool, - flex: float, - min_distance_from_avg: float, - min_period_length: int, + config: PeriodConfig, ) -> dict[str, Any]: """ Calculate price periods (best or peak) from price data. @@ -37,10 +48,8 @@ def calculate_periods( Args: all_prices: All price data points from yesterday/today/tomorrow - reverse_sort: True for peak price (max reference), False for best price (min reference) - flex: Flexibility threshold as decimal (e.g., 0.05 = 5%) - min_distance_from_avg: Minimum distance from average as percentage (e.g., 10.0 = 10%) - min_period_length: Minimum period length in minutes + config: Period configuration containing reverse_sort, flex, min_distance_from_avg, + min_period_length, threshold_low, and threshold_high Returns: Dict with: @@ -49,6 +58,14 @@ def calculate_periods( - reference_data: Daily min/max/avg for on-demand annotation """ + # Extract config values + reverse_sort = config.reverse_sort + flex = config.flex + min_distance_from_avg = config.min_distance_from_avg + min_period_length = config.min_period_length + threshold_low = config.threshold_low + threshold_high = config.threshold_high + if not all_prices: return { "periods": [], @@ -100,7 +117,12 @@ def calculate_periods( # Step 8: Extract lightweight period summaries (no full price data) # Note: Filtering for current/future is done here based on end date, # not start date. This preserves periods that started yesterday but end today. - period_summaries = _extract_period_summaries(raw_periods) + period_summaries = _extract_period_summaries( + raw_periods, + all_prices_sorted, + threshold_low=threshold_low, + threshold_high=threshold_high, + ) return { "periods": period_summaries, # Lightweight summaries only @@ -328,7 +350,13 @@ def _filter_periods_by_end_date(periods: list[list[dict]]) -> list[list[dict]]: return filtered -def _extract_period_summaries(periods: list[list[dict]]) -> list[dict]: +def _extract_period_summaries( + periods: list[list[dict]], + all_prices: list[dict], + *, + threshold_low: float | None, + threshold_high: float | None, +) -> list[dict]: """ Extract lightweight period summaries without storing full price data. @@ -336,9 +364,26 @@ def _extract_period_summaries(periods: list[list[dict]]) -> list[dict]: - start/end timestamps - interval count - duration + - aggregated level (from API's "level" field) + - aggregated rating_level (from calculated "rating_level" field) Sensors can use these summaries to query the actual price data from priceInfo on demand. + + Args: + periods: List of periods, where each period is a list of interval dictionaries + all_prices: All price data from the API (enriched with level, difference, rating_level) + threshold_low: Low threshold for rating level calculation + threshold_high: High threshold for rating level calculation + """ + # Build lookup dictionary for full price data by timestamp + price_lookup: dict[str, dict] = {} + for price_data in all_prices: + starts_at = dt_util.parse_datetime(price_data["startsAt"]) + if starts_at: + starts_at = dt_util.as_local(starts_at) + price_lookup[starts_at.isoformat()] = price_data + summaries = [] for period in periods: @@ -354,15 +399,44 @@ def _extract_period_summaries(periods: list[list[dict]]) -> list[dict]: if not start_time or not end_time: continue + # Collect interval timestamps + interval_starts = [ + start.isoformat() for interval in period if (start := interval.get("interval_start")) is not None + ] + + # Look up full price data for each interval in the period + period_price_data: list[dict] = [] + for start_iso in interval_starts: + price_data = price_lookup.get(start_iso) + if price_data: + period_price_data.append(price_data) + + # Calculate aggregated level and rating_level + aggregated_level = None + aggregated_rating = None + + if period_price_data: + # Aggregate level (from API's "level" field) + aggregated_level = aggregate_period_levels(period_price_data) + + # Aggregate rating_level (from calculated "rating_level" and "difference" fields) + if threshold_low is not None and threshold_high is not None: + aggregated_rating, _ = aggregate_period_ratings( + period_price_data, + threshold_low, + threshold_high, + ) + summary = { "start": start_time, "end": end_time, "interval_count": len(period), "duration_minutes": len(period) * MINUTES_PER_INTERVAL, # Store interval timestamps for reference (minimal data) - "interval_starts": [ - start.isoformat() for interval in period if (start := interval.get("interval_start")) is not None - ], + "interval_starts": interval_starts, + # Aggregated attributes + "level": aggregated_level, + "rating_level": aggregated_rating, } summaries.append(summary) diff --git a/custom_components/tibber_prices/price_utils.py b/custom_components/tibber_prices/price_utils.py index af169cd..76fcd6e 100644 --- a/custom_components/tibber_prices/price_utils.py +++ b/custom_components/tibber_prices/price_utils.py @@ -345,6 +345,71 @@ def aggregate_price_rating(differences: list[float], threshold_low: float, thres return rating_level or PRICE_RATING_NORMAL, avg_difference +def aggregate_period_levels(interval_data_list: list[dict[str, Any]]) -> str | None: + """ + Aggregate price levels across multiple intervals in a period. + + Extracts "level" from each interval and uses the same logic as + aggregate_price_levels() to determine the overall level for the period. + + Args: + interval_data_list: List of price interval dictionaries with "level" keys + + Returns: + The aggregated level string in lowercase (e.g., "very_cheap", "normal", "expensive"), + or None if no valid levels found + + """ + levels: list[str] = [] + for interval in interval_data_list: + level = interval.get("level") + if level is not None and isinstance(level, str): + levels.append(level) + + if not levels: + return None + + aggregated = aggregate_price_levels(levels) + # Convert to lowercase for consistency with other enum sensors + return aggregated.lower() if aggregated else None + + +def aggregate_period_ratings( + interval_data_list: list[dict[str, Any]], + threshold_low: float, + threshold_high: float, +) -> tuple[str | None, float | None]: + """ + Aggregate price ratings across multiple intervals in a period. + + Extracts "difference" from each interval and uses the same logic as + aggregate_price_rating() to determine the overall rating for the period. + + Args: + interval_data_list: List of price interval dictionaries with "difference" keys + threshold_low: The low threshold percentage for LOW rating + threshold_high: The high threshold percentage for HIGH rating + + Returns: + Tuple of (rating_level, average_difference) + rating_level: "low", "normal", "high" (lowercase), or None if no valid data + average_difference: The averaged difference percentage, or None if no valid data + + """ + differences: list[float] = [] + for interval in interval_data_list: + diff = interval.get("difference") + if diff is not None: + differences.append(float(diff)) + + if not differences: + return None, None + + rating_level, avg_diff = aggregate_price_rating(differences, threshold_low, threshold_high) + # Convert to lowercase for consistency with other enum sensors + return rating_level.lower() if rating_level else None, avg_diff + + def calculate_price_trend( current_price: float, future_average: float,