From 9640b041e072b9334abd0d2ffd17c4ca9a5568ce Mon Sep 17 00:00:00 2001 From: Julian Pawlowski Date: Sun, 9 Nov 2025 23:46:48 +0000 Subject: [PATCH] refactor(periods): move all period logic to coordinator and refactor period_utils MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moved filter logic and all period attribute calculations from binary_sensor.py to coordinator.py and period_utils.py, following Home Assistant best practices for data flow architecture. ARCHITECTURE CHANGES: Binary Sensor Simplification (~225 lines removed): - Removed _build_periods_summary, _add_price_diff_for_period (calculation logic) - Removed _get_period_intervals_from_price_info (107 lines, interval reconstruction) - Removed _should_show_periods, _check_volatility_filter, _check_level_filter - Removed _build_empty_periods_result (filtering result builder) - Removed _get_price_hours_attributes (24 lines, dead code) - Removed datetime import (unused after cleanup) - New: _build_final_attributes_simple (~20 lines, timestamp-only) - Result: Pure display-only logic, reads pre-calculated data from coordinator Coordinator Enhancement (+160 lines): - Added _should_show_periods(): UND-Verknüpfung of volatility and level filters - Added _check_volatility_filter(): Checks min_volatility threshold - Added _check_level_filter(): Checks min/max level bounds - Enhanced _calculate_periods_for_price_info(): Applies filters before period calculation - Returns empty periods when filters don't match (instead of calculating unnecessarily) - Passes volatility thresholds (moderate/high/very_high) to PeriodConfig Period Utils Refactoring (+110 lines): - Extended PeriodConfig with threshold_volatility_moderate/high/very_high - Added PeriodData NamedTuple: Groups timing data (start, end, length, position) - Added PeriodStatistics NamedTuple: Groups calculated stats (prices, volatility, ratings) - Added ThresholdConfig NamedTuple: Groups all thresholds + reverse_sort flag - New _calculate_period_price_statistics(): Extracts price_avg/min/max/spread calculation - New _build_period_summary_dict(): Builds final dict with correct attribute ordering - Enhanced _extract_period_summaries(): Now calculates ALL attributes (no longer lightweight): * price_avg, price_min, price_max, price_spread (in minor units: ct/øre) * volatility (low/moderate/high/very_high based on absolute thresholds) * rating_difference_% (average of interval differences) * period_price_diff_from_daily_min/max (period avg vs daily reference) * aggregated level and rating_level * period_interval_count (renamed from interval_count for clarity) - Removed interval_starts array (redundant - start/end/count sufficient) - Function signature refactored from 9→4 parameters using NamedTuples Code Organization (HA Best Practice): - Moved calculate_volatility_level() from const.py to price_utils.py - Rule: const.py should contain only constants, no functions - Removed duplicate VOLATILITY_THRESHOLD_* constants from const.py - Updated imports in sensor.py, services.py, period_utils.py DATA FLOW: Before: API → Coordinator (basic enrichment) → Binary Sensor (calculate everything on each access) After: API → Coordinator (enrichment + filtering + period calculation with ALL attributes) → Cached Data → Binary Sensor (display + timestamp only) ATTRIBUTE STRUCTURE: Period summaries now contain (following copilot-instructions.md ordering): 1. Time: start, end, duration_minutes 2. Decision: level, rating_level, rating_difference_% 3. Prices: price_avg, price_min, price_max, price_spread, volatility 4. Differences: period_price_diff_from_daily_min/max (conditional) 5. Details: period_interval_count, period_position 6. Meta: periods_total, periods_remaining BREAKING CHANGES: None - Period data structure enhanced but backwards compatible - Binary sensor API unchanged (state + attributes) Impact: Binary sensors now display pre-calculated data from coordinator instead of calculating on every access. Reduces complexity, improves performance, and centralizes business logic following Home Assistant coordinator pattern. All period filtering (volatility + level) now happens in coordinator before caching. --- .../tibber_prices/binary_sensor.py | 663 ++---------------- custom_components/tibber_prices/const.py | 47 -- .../tibber_prices/coordinator.py | 263 ++++++- .../tibber_prices/period_utils.py | 317 +++++++-- .../tibber_prices/price_utils.py | 55 +- custom_components/tibber_prices/sensor.py | 2 +- custom_components/tibber_prices/services.py | 2 +- 7 files changed, 611 insertions(+), 738 deletions(-) diff --git a/custom_components/tibber_prices/binary_sensor.py b/custom_components/tibber_prices/binary_sensor.py index 67861c1..625940f 100644 --- a/custom_components/tibber_prices/binary_sensor.py +++ b/custom_components/tibber_prices/binary_sensor.py @@ -2,7 +2,6 @@ from __future__ import annotations -from datetime import datetime, timedelta from typing import TYPE_CHECKING from homeassistant.components.binary_sensor import ( @@ -10,7 +9,7 @@ from homeassistant.components.binary_sensor import ( BinarySensorEntity, BinarySensorEntityDescription, ) -from homeassistant.const import PERCENTAGE, EntityCategory +from homeassistant.const import EntityCategory from homeassistant.core import callback from homeassistant.util import dt as dt_util @@ -27,26 +26,8 @@ if TYPE_CHECKING: from .data import TibberPricesConfigEntry from .const import ( - CONF_BEST_PRICE_MAX_LEVEL, - CONF_BEST_PRICE_MIN_VOLATILITY, CONF_EXTENDED_DESCRIPTIONS, - CONF_PEAK_PRICE_MIN_LEVEL, - CONF_PEAK_PRICE_MIN_VOLATILITY, - CONF_VOLATILITY_THRESHOLD_HIGH, - CONF_VOLATILITY_THRESHOLD_MODERATE, - CONF_VOLATILITY_THRESHOLD_VERY_HIGH, - DEFAULT_BEST_PRICE_MAX_LEVEL, - DEFAULT_BEST_PRICE_MIN_VOLATILITY, DEFAULT_EXTENDED_DESCRIPTIONS, - DEFAULT_PEAK_PRICE_MIN_LEVEL, - DEFAULT_PEAK_PRICE_MIN_VOLATILITY, - DEFAULT_VOLATILITY_THRESHOLD_HIGH, - DEFAULT_VOLATILITY_THRESHOLD_MODERATE, - DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, - PRICE_LEVEL_MAPPING, - VOLATILITY_HIGH, - VOLATILITY_MODERATE, - VOLATILITY_VERY_HIGH, async_get_entity_description, get_entity_description, ) @@ -239,125 +220,18 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): period_type = "peak_price" if reverse_sort else "best_price" return periods_data.get(period_type) - def _get_period_intervals_from_price_info(self, period_summaries: list[dict], *, reverse_sort: bool) -> list[dict]: - """ - Build full interval data from period summaries and priceInfo. - - This avoids storing price data redundantly by fetching it on-demand from priceInfo. - """ - if not self.coordinator.data or not period_summaries: - return [] - - price_info = self.coordinator.data.get("priceInfo", {}) - yesterday = price_info.get("yesterday", []) - today = price_info.get("today", []) - tomorrow = price_info.get("tomorrow", []) - - # Build a quick lookup for prices by timestamp - all_prices = yesterday + today + tomorrow - price_lookup = {} - for price_data in all_prices: - starts_at = dt_util.parse_datetime(price_data["startsAt"]) - if starts_at: - starts_at = dt_util.as_local(starts_at) - price_lookup[starts_at.isoformat()] = price_data - - # Get reference data for annotations - period_data = self._get_precomputed_period_data(reverse_sort=reverse_sort) - if not period_data: - return [] - - ref_data = period_data.get("reference_data", {}) - ref_prices = ref_data.get("ref_prices", {}) - avg_prices = ref_data.get("avg_prices", {}) - - # Build annotated intervals from period summaries - intervals = [] - period_count = len(period_summaries) - - for period_idx, period_summary in enumerate(period_summaries, 1): - period_start = period_summary.get("start") - period_end = period_summary.get("end") - interval_starts = period_summary.get("interval_starts", []) - interval_count = len(interval_starts) - duration_minutes = period_summary.get("duration_minutes", 0) - periods_remaining = period_count - period_idx - - for interval_idx, start_iso in enumerate(interval_starts, 1): - # Get price data from priceInfo - price_data = price_lookup.get(start_iso) - if not price_data: - continue - - starts_at = dt_util.parse_datetime(price_data["startsAt"]) - if not starts_at: - continue - starts_at = dt_util.as_local(starts_at) - date_key = starts_at.date().isoformat() - - price_raw = float(price_data["total"]) - price_minor = round(price_raw * 100, 2) - - # Get reference values for this day - ref_price = ref_prices.get(date_key, 0.0) - avg_price = avg_prices.get(date_key, 0.0) - - # Calculate price difference - price_diff = price_raw - ref_price - price_diff_minor = round(price_diff * 100, 2) - price_diff_pct = (price_diff / ref_price) * 100 if ref_price != 0 else 0.0 - - interval_remaining = interval_count - interval_idx - interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL) - - annotated = { - # Period-level attributes - "period_start": period_start, - "period_end": period_end, - "hour": period_start.hour if period_start else None, - "minute": period_start.minute if period_start else None, - "time": f"{period_start.hour:02d}:{period_start.minute:02d}" if period_start else None, - "duration_minutes": duration_minutes, - "remaining_minutes_in_period": interval_remaining * MINUTES_PER_INTERVAL, - "periods_total": period_count, - "periods_remaining": periods_remaining, - "period_position": period_idx, - # Interval-level attributes - "price": price_minor, - # Internal fields - "_interval_start": starts_at, - "_interval_end": interval_end, - "_ref_price": ref_price, - "_avg_price": avg_price, - } - - # Add price difference attributes based on sensor type - if reverse_sort: - annotated["price_diff_from_max"] = price_diff_minor - annotated[f"price_diff_from_max_{PERCENTAGE}"] = round(price_diff_pct, 2) - else: - annotated["price_diff_from_min"] = price_diff_minor - annotated[f"price_diff_from_min_{PERCENTAGE}"] = round(price_diff_pct, 2) - - intervals.append(annotated) - - return intervals - def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None: """ Get price interval attributes using precomputed data from coordinator. - This method now: - 1. Gets lightweight period summaries from coordinator - 2. Fetches actual price data from priceInfo on-demand - 3. Builds annotations without storing data redundantly - 4. Filters periods based on volatility and level thresholds if configured - """ - # Check if periods should be filtered based on volatility and level - if not self._should_show_periods(reverse_sort=reverse_sort): - return self._build_empty_periods_result(reverse_sort=reverse_sort) + All data is already calculated in the coordinator - we just need to: + 1. Get period summaries from coordinator (already filtered and fully calculated) + 2. Add the current timestamp + 3. Find current or next period based on time - # Get precomputed period summaries from coordinator + Note: All calculations (filtering, aggregations, level/rating) are done in coordinator. + """ + # Get precomputed period summaries from coordinator (already filtered and complete!) period_data = self._get_precomputed_period_data(reverse_sort=reverse_sort) if not period_data: return self._build_no_periods_result() @@ -366,160 +240,28 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): if not period_summaries: return self._build_no_periods_result() - # Build full interval data from summaries + priceInfo - intervals = self._get_period_intervals_from_price_info(period_summaries, reverse_sort=reverse_sort) - if not intervals: - return self._build_no_periods_result() + # Find current or next period based on current time + now = dt_util.now() + current_period = None - # Find current or next interval - current_interval = self._find_current_or_next_interval(intervals) + # First pass: find currently active period + for period in period_summaries: + start = period.get("start") + end = period.get("end") + if start and end and start <= now < end: + current_period = period + break - # Build periods summary (merge with original summaries to include level/rating_level) - periods_summary = self._build_periods_summary(intervals, period_summaries) + # Second pass: find next future period if none is active + if not current_period: + for period in period_summaries: + start = period.get("start") + if start and start > now: + current_period = period + break # Build final attributes - return self._build_final_attributes(current_interval, periods_summary, intervals) - - def _should_show_periods(self, *, reverse_sort: bool) -> bool: - """ - Check if periods should be shown based on volatility AND level filters (UND-Verknüpfung). - - Args: - reverse_sort: If False (best_price), checks max_level filter. - If True (peak_price), checks min_level filter. - - Returns: - True if periods should be displayed, False if they should be filtered out. - Both conditions must be met for periods to be shown. - - """ - if not self.coordinator.data: - return True - - # Check volatility filter - if not self._check_volatility_filter(reverse_sort=reverse_sort): - return False - - # Check level filter (UND-Verknüpfung) - return self._check_level_filter(reverse_sort=reverse_sort) - - def _check_volatility_filter(self, *, reverse_sort: bool) -> bool: - """ - Check if today's volatility meets the minimum requirement. - - Args: - reverse_sort: If False (best_price), uses CONF_BEST_PRICE_MIN_VOLATILITY. - If True (peak_price), uses CONF_PEAK_PRICE_MIN_VOLATILITY. - - """ - # Get appropriate volatility config based on sensor type - if reverse_sort: - # Peak price sensor - min_volatility = self.coordinator.config_entry.options.get( - CONF_PEAK_PRICE_MIN_VOLATILITY, - DEFAULT_PEAK_PRICE_MIN_VOLATILITY, - ) - else: - # Best price sensor - min_volatility = self.coordinator.config_entry.options.get( - CONF_BEST_PRICE_MIN_VOLATILITY, - DEFAULT_BEST_PRICE_MIN_VOLATILITY, - ) - - # "low" means no filtering (show at any volatility ≥0ct) - if min_volatility == "low": - return True - - # "any" is legacy alias for "low" (no filtering) - if min_volatility == "any": - return True - - # Get today's price data to calculate volatility - price_info = self.coordinator.data.get("priceInfo", {}) - today_prices = price_info.get("today", []) - prices = [p.get("total") for p in today_prices if "total" in p] if today_prices else [] - - if not prices: - return True # If no prices, don't filter - - # Calculate today's spread (volatility metric) in minor units - spread_major = (max(prices) - min(prices)) * 100 - - # Get volatility thresholds from config - threshold_moderate = self.coordinator.config_entry.options.get( - CONF_VOLATILITY_THRESHOLD_MODERATE, - DEFAULT_VOLATILITY_THRESHOLD_MODERATE, - ) - threshold_high = self.coordinator.config_entry.options.get( - CONF_VOLATILITY_THRESHOLD_HIGH, - DEFAULT_VOLATILITY_THRESHOLD_HIGH, - ) - threshold_very_high = self.coordinator.config_entry.options.get( - CONF_VOLATILITY_THRESHOLD_VERY_HIGH, - DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, - ) - - # Map min_volatility to threshold and check if spread meets requirement - threshold_map = { - VOLATILITY_MODERATE: threshold_moderate, - VOLATILITY_HIGH: threshold_high, - VOLATILITY_VERY_HIGH: threshold_very_high, - } - - required_threshold = threshold_map.get(min_volatility) - return spread_major >= required_threshold if required_threshold is not None else True - - def _check_level_filter(self, *, reverse_sort: bool) -> bool: - """ - Check if today has any intervals that meet the level requirement. - - Args: - reverse_sort: If False (best_price), checks max_level (upper bound filter). - If True (peak_price), checks min_level (lower bound filter). - - Returns: - True if ANY interval meets the level requirement, False otherwise. - - """ - # Get appropriate config based on sensor type - if reverse_sort: - # Peak price: minimum level filter (lower bound) - level_config = self.coordinator.config_entry.options.get( - CONF_PEAK_PRICE_MIN_LEVEL, - DEFAULT_PEAK_PRICE_MIN_LEVEL, - ) - else: - # Best price: maximum level filter (upper bound) - level_config = self.coordinator.config_entry.options.get( - CONF_BEST_PRICE_MAX_LEVEL, - DEFAULT_BEST_PRICE_MAX_LEVEL, - ) - - # "any" means no level filtering - if level_config == "any": - return True - - # Get today's intervals - price_info = self.coordinator.data.get("priceInfo", {}) - today_intervals = price_info.get("today", []) - - if not today_intervals: - return True # If no data, don't filter - - # Check if ANY interval today meets the level requirement - # Note: level_config is lowercase from selector, but PRICE_LEVEL_MAPPING uses uppercase - level_order = PRICE_LEVEL_MAPPING.get(level_config.upper(), 0) - - if reverse_sort: - # Peak price: level >= min_level (show if ANY interval is expensive enough) - return any( - PRICE_LEVEL_MAPPING.get(interval.get("level", "NORMAL"), 0) >= level_order - for interval in today_intervals - ) - # Best price: level <= max_level (show if ANY interval is cheap enough) - return any( - PRICE_LEVEL_MAPPING.get(interval.get("level", "NORMAL"), 0) <= level_order for interval in today_intervals - ) + return self._build_final_attributes_simple(current_period, period_summaries) def _build_no_periods_result(self) -> dict: """ @@ -541,350 +283,45 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): "periods": [], } - def _build_empty_periods_result(self, *, reverse_sort: bool) -> dict: - """ - Build result when periods are filtered due to volatility or level constraints. - - Args: - reverse_sort: If False (best_price), reports max_level filter. - If True (peak_price), reports min_level filter. - - Returns: - A dict with empty periods and a reason attribute explaining why. - - """ - # Get appropriate volatility config based on sensor type - if reverse_sort: - min_volatility = self.coordinator.config_entry.options.get( - CONF_PEAK_PRICE_MIN_VOLATILITY, - DEFAULT_PEAK_PRICE_MIN_VOLATILITY, - ) - else: - min_volatility = self.coordinator.config_entry.options.get( - CONF_BEST_PRICE_MIN_VOLATILITY, - DEFAULT_BEST_PRICE_MIN_VOLATILITY, - ) - - # Get appropriate level config based on sensor type - if reverse_sort: - level_config = self.coordinator.config_entry.options.get( - CONF_PEAK_PRICE_MIN_LEVEL, - DEFAULT_PEAK_PRICE_MIN_LEVEL, - ) - level_filter_type = "below" # Peak price: level below min threshold - else: - level_config = self.coordinator.config_entry.options.get( - CONF_BEST_PRICE_MAX_LEVEL, - DEFAULT_BEST_PRICE_MAX_LEVEL, - ) - level_filter_type = "above" # Best price: level above max threshold - - # Build reason string explaining which filter(s) prevented display - reasons = [] - if min_volatility != "any" and not self._check_volatility_filter(reverse_sort=reverse_sort): - reasons.append(f"volatility_below_{min_volatility}") - if level_config != "any" and not self._check_level_filter(reverse_sort=reverse_sort): - reasons.append(f"level_{level_filter_type}_{level_config}") - - # Join multiple reasons with "and" - reason = "_and_".join(reasons) if reasons else "filtered" - - # Calculate timestamp: current time rounded down to last quarter hour - now = dt_util.now() - current_minute = (now.minute // 15) * 15 - timestamp = now.replace(minute=current_minute, second=0, microsecond=0) - - return { - "timestamp": timestamp, - "start": None, - "end": None, - "periods": [], - "reason": reason, - } - - def _find_current_or_next_interval(self, intervals: list[dict]) -> dict | None: - """Find the current or next interval from the filtered list.""" - now = dt_util.now() - # First pass: find currently active interval - for interval in intervals: - start = interval.get("_interval_start") - end = interval.get("_interval_end") - if start and end and start <= now < end: - return interval.copy() - # Second pass: find next future interval - for interval in intervals: - start = interval.get("_interval_start") - if start and start > now: - return interval.copy() - return None - - def _build_periods_summary(self, intervals: list[dict], original_summaries: list[dict]) -> list[dict]: - """ - Build a summary of periods with consistent attribute structure. - - Returns a list of period summaries with the same attributes as top-level, - making the structure predictable and easy to use in automations. - - Args: - intervals: List of interval dictionaries with period information - original_summaries: Original period summaries from coordinator (with level/rating_level) - - """ - if not intervals: - return [] - - # Build a lookup for original summaries by start time - original_lookup: dict[str, dict] = {} - for summary in original_summaries: - start = summary.get("start") - if start: - key = start.isoformat() if hasattr(start, "isoformat") else str(start) - original_lookup[key] = summary - - # Group intervals by period (they have the same period_start) - periods_dict: dict[str, list[dict]] = {} - for interval in intervals: - period_key = interval.get("period_start") - if period_key: - key_str = period_key.isoformat() if hasattr(period_key, "isoformat") else str(period_key) - if key_str not in periods_dict: - periods_dict[key_str] = [] - periods_dict[key_str].append(interval) - - # Build summary for each period with consistent attribute names - summaries = [] - for period_intervals in periods_dict.values(): - if not period_intervals: - continue - - first = period_intervals[0] - prices = [i["price"] for i in period_intervals if "price" in i] - - # Get level and rating_level from original summaries first - aggregated_level = None - aggregated_rating_level = None - period_start = first.get("period_start") - if period_start: - key = period_start.isoformat() if hasattr(period_start, "isoformat") else str(period_start) - original = original_lookup.get(key) - if original: - aggregated_level = original.get("level") - aggregated_rating_level = original.get("rating_level") - - # Follow attribute ordering from copilot-instructions.md - summary = { - "start": first.get("period_start"), - "end": first.get("period_end"), - "duration_minutes": first.get("duration_minutes"), - "level": aggregated_level, - "rating_level": aggregated_rating_level, - "price_avg": round(sum(prices) / len(prices), 2) if prices else 0, - "price_min": round(min(prices), 2) if prices else 0, - "price_max": round(max(prices), 2) if prices else 0, - "price_spread": round(max(prices) - min(prices), 2) if prices else 0, - "hour": first.get("hour"), - "minute": first.get("minute"), - "time": first.get("time"), - "periods_total": first.get("periods_total"), - "periods_remaining": first.get("periods_remaining"), - "period_position": first.get("period_position"), - "interval_count": len(period_intervals), - } - - # Add price_diff attributes if present (price differences step 4) - self._add_price_diff_for_period(summary, period_intervals, first) - - summaries.append(summary) - - return summaries - - def _build_final_attributes( + def _build_final_attributes_simple( self, - current_interval: dict | None, - periods_summary: list[dict], - filtered_result: list[dict], + current_period: dict | None, + period_summaries: list[dict], ) -> dict: """ - Build the final attributes dictionary from period summary and current interval. + Build the final attributes dictionary from coordinator's period summaries. + + All calculations are done in the coordinator - this just: + 1. Adds the current timestamp (only thing calculated every 15min) + 2. Uses the current/next period from summaries + 3. Adds nested period summaries + + Args: + current_period: The current or next period (already complete from coordinator) + period_summaries: All period summaries from coordinator - Combines period-level attributes with current interval-specific attributes, - ensuring price_diff reflects the current interval's position vs daily min/max. """ now = dt_util.now() current_minute = (now.minute // 15) * 15 timestamp = now.replace(minute=current_minute, second=0, microsecond=0) - if current_interval and periods_summary: - # Find the current period in the summary based on period_start - current_period_start = current_interval.get("period_start") - current_period_summary = None - - for period in periods_summary: - if period.get("start") == current_period_start: - current_period_summary = period - break - - if current_period_summary: - # Follow attribute ordering from copilot-instructions.md - attributes = { - "timestamp": timestamp, - "start": current_period_summary.get("start"), - "end": current_period_summary.get("end"), - "duration_minutes": current_period_summary.get("duration_minutes"), - "level": current_period_summary.get("level"), - "rating_level": current_period_summary.get("rating_level"), - "price_avg": current_period_summary.get("price_avg"), - "price_min": current_period_summary.get("price_min"), - "price_max": current_period_summary.get("price_max"), - "price_spread": current_period_summary.get("price_spread"), - "hour": current_period_summary.get("hour"), - "minute": current_period_summary.get("minute"), - "time": current_period_summary.get("time"), - "periods_total": current_period_summary.get("periods_total"), - "periods_remaining": current_period_summary.get("periods_remaining"), - "period_position": current_period_summary.get("period_position"), - "interval_count": current_period_summary.get("interval_count"), - } - - # Add period price_diff attributes if present - if "period_price_diff_from_daily_min" in current_period_summary: - attributes["period_price_diff_from_daily_min"] = current_period_summary[ - "period_price_diff_from_daily_min" - ] - if "period_price_diff_from_daily_min_%" in current_period_summary: - attributes["period_price_diff_from_daily_min_%"] = current_period_summary[ - "period_price_diff_from_daily_min_%" - ] - elif "period_price_diff_from_daily_max" in current_period_summary: - attributes["period_price_diff_from_daily_max"] = current_period_summary[ - "period_price_diff_from_daily_max" - ] - if "period_price_diff_from_daily_max_%" in current_period_summary: - attributes["period_price_diff_from_daily_max_%"] = current_period_summary[ - "period_price_diff_from_daily_max_%" - ] - - # Add interval-specific price_diff attributes (separate from period average) - # Shows the reference interval's position vs daily min/max: - # - If period is active: current 15-min interval vs daily min/max - # - If period hasn't started: first interval of the period vs daily min/max - # This value is what determines if an interval is part of a period (compared to flex setting) - if "price_diff_from_min" in current_interval: - attributes["interval_price_diff_from_daily_min"] = current_interval["price_diff_from_min"] - attributes["interval_price_diff_from_daily_min_%"] = current_interval.get("price_diff_from_min_%") - elif "price_diff_from_max" in current_interval: - attributes["interval_price_diff_from_daily_max"] = current_interval["price_diff_from_max"] - attributes["interval_price_diff_from_daily_max_%"] = current_interval.get("price_diff_from_max_%") - - # Nested structures last (meta information step 6) - attributes["periods"] = periods_summary - return attributes - - # Fallback if current period not found in summary - return { - "timestamp": timestamp, - "periods": periods_summary, - "interval_count": len(filtered_result), + if current_period: + # Start with complete period summary from coordinator (already has all attributes!) + attributes = { + "timestamp": timestamp, # ONLY thing we calculate here! + **current_period, # All other attributes come from coordinator } - # No periods found + # Add nested period summaries last (meta information) + attributes["periods"] = period_summaries + return attributes + + # No current/next period found - return all periods with timestamp return { "timestamp": timestamp, - "periods": [], - "interval_count": 0, + "periods": period_summaries, } - def _add_price_diff_for_period(self, summary: dict, period_intervals: list[dict], first: dict) -> None: - """ - Add price difference attributes for the period based on sensor type. - - Uses the reference price (min/max) from the start day of the period to ensure - consistent comparison, especially for periods spanning midnight. - - Calculates how the period's average price compares to the daily min/max, - helping to explain why the period qualifies based on flex settings. - """ - # Determine sensor type and get the reference price from the first interval - # (which represents the start of the period and its day's reference value) - if "price_diff_from_min" in first: - # Best price sensor: calculate difference from the period's start day minimum - period_start = first.get("period_start") - if not period_start: - return - - # Get all prices in minor units (cents/øre) from the period - prices = [i["price"] for i in period_intervals if "price" in i] - if not prices: - return - - period_avg_price = sum(prices) / len(prices) - - # Extract the reference min price from first interval's calculation - # We can back-calculate it from the first interval's price and diff - first_price_minor = first.get("price") - first_diff_minor = first.get("price_diff_from_min") - - if first_price_minor is not None and first_diff_minor is not None: - ref_min_price = first_price_minor - first_diff_minor - period_diff = period_avg_price - ref_min_price - - # Period average price difference from daily minimum - summary["period_price_diff_from_daily_min"] = round(period_diff, 2) - if ref_min_price != 0: - period_diff_pct = (period_diff / ref_min_price) * 100 - summary["period_price_diff_from_daily_min_%"] = round(period_diff_pct, 2) - - elif "price_diff_from_max" in first: - # Peak price sensor: calculate difference from the period's start day maximum - period_start = first.get("period_start") - if not period_start: - return - - # Get all prices in minor units (cents/øre) from the period - prices = [i["price"] for i in period_intervals if "price" in i] - if not prices: - return - - period_avg_price = sum(prices) / len(prices) - - # Extract the reference max price from first interval's calculation - first_price_minor = first.get("price") - first_diff_minor = first.get("price_diff_from_max") - - if first_price_minor is not None and first_diff_minor is not None: - ref_max_price = first_price_minor - first_diff_minor - period_diff = period_avg_price - ref_max_price - - # Period average price difference from daily maximum - summary["period_price_diff_from_daily_max"] = round(period_diff, 2) - if ref_max_price != 0: - period_diff_pct = (period_diff / ref_max_price) * 100 - summary["period_price_diff_from_daily_max_%"] = round(period_diff_pct, 2) - - def _get_price_hours_attributes(self, *, attribute_name: str, reverse_sort: bool) -> dict | None: - """Get price hours attributes.""" - if not self.coordinator.data: - return None - - price_info = self.coordinator.data.get("priceInfo", {}) - - today_prices = price_info.get("today", []) - if not today_prices: - return None - - prices = [ - ( - datetime.fromisoformat(price["startsAt"]).hour, - float(price["total"]), - ) - for price in today_prices - ] - - # Sort by price (high to low for peak, low to high for best) - sorted_hours = sorted(prices, key=lambda x: x[1], reverse=reverse_sort)[:5] - - return {attribute_name: [{"hour": hour, "price": price} for hour, price in sorted_hours]} - @property def is_on(self) -> bool | None: """Return true if the binary_sensor is on.""" diff --git a/custom_components/tibber_prices/const.py b/custom_components/tibber_prices/const.py index 626037f..9062cda 100644 --- a/custom_components/tibber_prices/const.py +++ b/custom_components/tibber_prices/const.py @@ -133,48 +133,6 @@ def format_price_unit_minor(currency_code: str | None) -> str: return f"{minor_symbol}/{UnitOfPower.KILO_WATT}{UnitOfTime.HOURS}" -def calculate_volatility_level( - spread: float, - threshold_moderate: float | None = None, - threshold_high: float | None = None, - threshold_very_high: float | None = None, -) -> str: - """ - Calculate volatility level from price spread. - - Volatility indicates how much prices fluctuate during a period, which helps - determine whether active load shifting is worthwhile. - - Args: - spread: Absolute price difference between max and min (in minor currency units, e.g., ct or øre) - threshold_moderate: Custom threshold for MODERATE level (default: use VOLATILITY_THRESHOLD_MODERATE) - threshold_high: Custom threshold for HIGH level (default: use VOLATILITY_THRESHOLD_HIGH) - threshold_very_high: Custom threshold for VERY_HIGH level (default: use VOLATILITY_THRESHOLD_VERY_HIGH) - - Returns: - Volatility level: LOW, MODERATE, HIGH, or VERY_HIGH - - Examples: - - spread < 5: LOW → minimal optimization potential - - 5 ≤ spread < 15: MODERATE → some optimization worthwhile - - 15 ≤ spread < 30: HIGH → strong optimization recommended - - spread ≥ 30: VERY_HIGH → maximum optimization potential - - """ - # Use provided thresholds or fall back to constants - t_moderate = threshold_moderate if threshold_moderate is not None else VOLATILITY_THRESHOLD_MODERATE - t_high = threshold_high if threshold_high is not None else VOLATILITY_THRESHOLD_HIGH - t_very_high = threshold_very_high if threshold_very_high is not None else VOLATILITY_THRESHOLD_VERY_HIGH - - if spread < t_moderate: - return VOLATILITY_LOW - if spread < t_high: - return VOLATILITY_MODERATE - if spread < t_very_high: - return VOLATILITY_HIGH - return VOLATILITY_VERY_HIGH - - # Price level constants from Tibber API PRICE_LEVEL_VERY_CHEAP = "VERY_CHEAP" PRICE_LEVEL_CHEAP = "CHEAP" @@ -193,11 +151,6 @@ VOLATILITY_MODERATE = "MODERATE" VOLATILITY_HIGH = "HIGH" VOLATILITY_VERY_HIGH = "VERY_HIGH" -# Volatility thresholds (in minor currency units like ct or øre) -VOLATILITY_THRESHOLD_MODERATE = 5 # Below this: LOW, above: MODERATE -VOLATILITY_THRESHOLD_HIGH = 15 # Below this: MODERATE, above: HIGH -VOLATILITY_THRESHOLD_VERY_HIGH = 30 # Below this: HIGH, above: VERY_HIGH - # Sensor options (lowercase versions for ENUM device class) # NOTE: These constants define the valid enum options, but they are not used directly # in sensor.py due to import timing issues. Instead, the options are defined inline diff --git a/custom_components/tibber_prices/coordinator.py b/custom_components/tibber_prices/coordinator.py index fb461f2..c77a403 100644 --- a/custom_components/tibber_prices/coordinator.py +++ b/custom_components/tibber_prices/coordinator.py @@ -26,22 +26,40 @@ from .api import ( ) from .const import ( CONF_BEST_PRICE_FLEX, + CONF_BEST_PRICE_MAX_LEVEL, CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG, CONF_BEST_PRICE_MIN_PERIOD_LENGTH, + CONF_BEST_PRICE_MIN_VOLATILITY, CONF_PEAK_PRICE_FLEX, CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG, + CONF_PEAK_PRICE_MIN_LEVEL, CONF_PEAK_PRICE_MIN_PERIOD_LENGTH, + CONF_PEAK_PRICE_MIN_VOLATILITY, CONF_PRICE_RATING_THRESHOLD_HIGH, CONF_PRICE_RATING_THRESHOLD_LOW, + CONF_VOLATILITY_THRESHOLD_HIGH, + CONF_VOLATILITY_THRESHOLD_MODERATE, + CONF_VOLATILITY_THRESHOLD_VERY_HIGH, DEFAULT_BEST_PRICE_FLEX, + DEFAULT_BEST_PRICE_MAX_LEVEL, DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG, DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH, + DEFAULT_BEST_PRICE_MIN_VOLATILITY, DEFAULT_PEAK_PRICE_FLEX, DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG, + DEFAULT_PEAK_PRICE_MIN_LEVEL, DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH, + DEFAULT_PEAK_PRICE_MIN_VOLATILITY, DEFAULT_PRICE_RATING_THRESHOLD_HIGH, DEFAULT_PRICE_RATING_THRESHOLD_LOW, + DEFAULT_VOLATILITY_THRESHOLD_HIGH, + DEFAULT_VOLATILITY_THRESHOLD_MODERATE, + DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, DOMAIN, + PRICE_LEVEL_MAPPING, + VOLATILITY_HIGH, + VOLATILITY_MODERATE, + VOLATILITY_VERY_HIGH, ) from .period_utils import PeriodConfig, calculate_periods from .price_utils import ( @@ -717,27 +735,160 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): "min_period_length": int(min_period_length), } + def _should_show_periods(self, price_info: dict[str, Any], *, reverse_sort: bool) -> bool: + """ + Check if periods should be shown based on volatility AND level filters (UND-Verknüpfung). + + Args: + price_info: Price information dict with today/yesterday/tomorrow data + reverse_sort: If False (best_price), checks max_level filter. + If True (peak_price), checks min_level filter. + + Returns: + True if periods should be displayed, False if they should be filtered out. + Both conditions must be met for periods to be shown. + + """ + # Check volatility filter + if not self._check_volatility_filter(price_info, reverse_sort=reverse_sort): + return False + + # Check level filter (UND-Verknüpfung) + return self._check_level_filter(price_info, reverse_sort=reverse_sort) + + def _check_volatility_filter(self, price_info: dict[str, Any], *, reverse_sort: bool) -> bool: + """ + Check if today's volatility meets the minimum requirement. + + Args: + price_info: Price information dict with today data + reverse_sort: If False (best_price), uses CONF_BEST_PRICE_MIN_VOLATILITY. + If True (peak_price), uses CONF_PEAK_PRICE_MIN_VOLATILITY. + + Returns: + True if volatility requirement met, False if periods should be filtered out. + + """ + # Get appropriate volatility config based on sensor type + if reverse_sort: + # Peak price sensor + min_volatility = self.config_entry.options.get( + CONF_PEAK_PRICE_MIN_VOLATILITY, + DEFAULT_PEAK_PRICE_MIN_VOLATILITY, + ) + else: + # Best price sensor + min_volatility = self.config_entry.options.get( + CONF_BEST_PRICE_MIN_VOLATILITY, + DEFAULT_BEST_PRICE_MIN_VOLATILITY, + ) + + # "low" means no filtering (show at any volatility ≥0ct) + if min_volatility == "low": + return True + + # "any" is legacy alias for "low" (no filtering) + if min_volatility == "any": + return True + + # Get today's price data to calculate volatility + today_prices = price_info.get("today", []) + prices = [p.get("total") for p in today_prices if "total" in p] if today_prices else [] + + if not prices: + return True # If no prices, don't filter + + # Calculate today's spread (volatility metric) in minor units + spread_major = (max(prices) - min(prices)) * 100 + + # Get volatility thresholds from config + threshold_moderate = self.config_entry.options.get( + CONF_VOLATILITY_THRESHOLD_MODERATE, + DEFAULT_VOLATILITY_THRESHOLD_MODERATE, + ) + threshold_high = self.config_entry.options.get( + CONF_VOLATILITY_THRESHOLD_HIGH, + DEFAULT_VOLATILITY_THRESHOLD_HIGH, + ) + threshold_very_high = self.config_entry.options.get( + CONF_VOLATILITY_THRESHOLD_VERY_HIGH, + DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, + ) + + # Map min_volatility to threshold and check if spread meets requirement + threshold_map = { + VOLATILITY_MODERATE: threshold_moderate, + VOLATILITY_HIGH: threshold_high, + VOLATILITY_VERY_HIGH: threshold_very_high, + } + + required_threshold = threshold_map.get(min_volatility) + return spread_major >= required_threshold if required_threshold is not None else True + + def _check_level_filter(self, price_info: dict[str, Any], *, reverse_sort: bool) -> bool: + """ + Check if today has any intervals that meet the level requirement. + + Args: + price_info: Price information dict with today data + reverse_sort: If False (best_price), checks max_level (upper bound filter). + If True (peak_price), checks min_level (lower bound filter). + + Returns: + True if ANY interval meets the level requirement, False otherwise. + + """ + # Get appropriate config based on sensor type + if reverse_sort: + # Peak price: minimum level filter (lower bound) + level_config = self.config_entry.options.get( + CONF_PEAK_PRICE_MIN_LEVEL, + DEFAULT_PEAK_PRICE_MIN_LEVEL, + ) + else: + # Best price: maximum level filter (upper bound) + level_config = self.config_entry.options.get( + CONF_BEST_PRICE_MAX_LEVEL, + DEFAULT_BEST_PRICE_MAX_LEVEL, + ) + + # "any" means no level filtering + if level_config == "any": + return True + + # Get today's intervals + today_intervals = price_info.get("today", []) + + if not today_intervals: + return True # If no data, don't filter + + # Check if ANY interval today meets the level requirement + # Note: level_config is lowercase from selector, but PRICE_LEVEL_MAPPING uses uppercase + level_order = PRICE_LEVEL_MAPPING.get(level_config.upper(), 0) + + if reverse_sort: + # Peak price: level >= min_level (show if ANY interval is expensive enough) + return any( + PRICE_LEVEL_MAPPING.get(interval.get("level", "NORMAL"), 0) >= level_order + for interval in today_intervals + ) + # Best price: level <= max_level (show if ANY interval is cheap enough) + return any( + PRICE_LEVEL_MAPPING.get(interval.get("level", "NORMAL"), 0) <= level_order for interval in today_intervals + ) + def _calculate_periods_for_price_info(self, price_info: dict[str, Any]) -> dict[str, Any]: - """Calculate periods (best price and peak price) for the given price info.""" + """ + Calculate periods (best price and peak price) for the given price info. + + Applies volatility and level filtering based on user configuration. + If filters don't match, returns empty period lists. + """ yesterday_prices = price_info.get("yesterday", []) today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = yesterday_prices + today_prices + tomorrow_prices - if not all_prices: - return { - "best_price": { - "periods": [], - "intervals": [], - "metadata": {"total_intervals": 0, "total_periods": 0, "config": {}}, - }, - "peak_price": { - "periods": [], - "intervals": [], - "metadata": {"total_intervals": 0, "total_periods": 0, "config": {}}, - }, - } - # Get rating thresholds from config threshold_low = self.config_entry.options.get( CONF_PRICE_RATING_THRESHOLD_LOW, @@ -748,29 +899,69 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): DEFAULT_PRICE_RATING_THRESHOLD_HIGH, ) - # Calculate best price periods - best_config = self._get_period_config(reverse_sort=False) - best_period_config = PeriodConfig( - reverse_sort=False, - flex=best_config["flex"], - min_distance_from_avg=best_config["min_distance_from_avg"], - min_period_length=best_config["min_period_length"], - threshold_low=threshold_low, - threshold_high=threshold_high, + # Get volatility thresholds from config + threshold_volatility_moderate = self.config_entry.options.get( + CONF_VOLATILITY_THRESHOLD_MODERATE, + DEFAULT_VOLATILITY_THRESHOLD_MODERATE, + ) + threshold_volatility_high = self.config_entry.options.get( + CONF_VOLATILITY_THRESHOLD_HIGH, + DEFAULT_VOLATILITY_THRESHOLD_HIGH, + ) + threshold_volatility_very_high = self.config_entry.options.get( + CONF_VOLATILITY_THRESHOLD_VERY_HIGH, + DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, ) - best_periods = calculate_periods(all_prices, config=best_period_config) - # Calculate peak price periods - peak_config = self._get_period_config(reverse_sort=True) - peak_period_config = PeriodConfig( - reverse_sort=True, - flex=peak_config["flex"], - min_distance_from_avg=peak_config["min_distance_from_avg"], - min_period_length=peak_config["min_period_length"], - threshold_low=threshold_low, - threshold_high=threshold_high, - ) - peak_periods = calculate_periods(all_prices, config=peak_period_config) + # Check if best price periods should be shown (apply filters) + show_best_price = self._should_show_periods(price_info, reverse_sort=False) if all_prices else False + + # Calculate best price periods (or return empty if filtered) + if show_best_price: + best_config = self._get_period_config(reverse_sort=False) + best_period_config = PeriodConfig( + reverse_sort=False, + flex=best_config["flex"], + min_distance_from_avg=best_config["min_distance_from_avg"], + min_period_length=best_config["min_period_length"], + threshold_low=threshold_low, + threshold_high=threshold_high, + threshold_volatility_moderate=threshold_volatility_moderate, + threshold_volatility_high=threshold_volatility_high, + threshold_volatility_very_high=threshold_volatility_very_high, + ) + best_periods = calculate_periods(all_prices, config=best_period_config) + else: + best_periods = { + "periods": [], + "intervals": [], + "metadata": {"total_intervals": 0, "total_periods": 0, "config": {}}, + } + + # Check if peak price periods should be shown (apply filters) + show_peak_price = self._should_show_periods(price_info, reverse_sort=True) if all_prices else False + + # Calculate peak price periods (or return empty if filtered) + if show_peak_price: + peak_config = self._get_period_config(reverse_sort=True) + peak_period_config = PeriodConfig( + reverse_sort=True, + flex=peak_config["flex"], + min_distance_from_avg=peak_config["min_distance_from_avg"], + min_period_length=peak_config["min_period_length"], + threshold_low=threshold_low, + threshold_high=threshold_high, + threshold_volatility_moderate=threshold_volatility_moderate, + threshold_volatility_high=threshold_volatility_high, + threshold_volatility_very_high=threshold_volatility_very_high, + ) + peak_periods = calculate_periods(all_prices, config=peak_period_config) + else: + peak_periods = { + "periods": [], + "intervals": [], + "metadata": {"total_intervals": 0, "total_periods": 0, "config": {}}, + } return { "best_price": best_periods, diff --git a/custom_components/tibber_prices/period_utils.py b/custom_components/tibber_prices/period_utils.py index 0386524..9e44eb5 100644 --- a/custom_components/tibber_prices/period_utils.py +++ b/custom_components/tibber_prices/period_utils.py @@ -3,13 +3,23 @@ from __future__ import annotations import logging -from datetime import date, timedelta +from datetime import date, datetime, timedelta from typing import Any, NamedTuple from homeassistant.util import dt as dt_util -from .const import DEFAULT_PRICE_RATING_THRESHOLD_HIGH, DEFAULT_PRICE_RATING_THRESHOLD_LOW -from .price_utils import aggregate_period_levels, aggregate_period_ratings +from .const import ( + DEFAULT_PRICE_RATING_THRESHOLD_HIGH, + DEFAULT_PRICE_RATING_THRESHOLD_LOW, + DEFAULT_VOLATILITY_THRESHOLD_HIGH, + DEFAULT_VOLATILITY_THRESHOLD_MODERATE, + DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, +) +from .price_utils import ( + aggregate_period_levels, + aggregate_period_ratings, + calculate_volatility_level, +) _LOGGER = logging.getLogger(__name__) @@ -25,6 +35,45 @@ class PeriodConfig(NamedTuple): min_period_length: int threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH + threshold_volatility_moderate: float = DEFAULT_VOLATILITY_THRESHOLD_MODERATE + threshold_volatility_high: float = DEFAULT_VOLATILITY_THRESHOLD_HIGH + threshold_volatility_very_high: float = DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH + + +class PeriodData(NamedTuple): + """Data for building a period summary.""" + + start_time: datetime + end_time: datetime + period_length: int + period_idx: int + total_periods: int + + +class PeriodStatistics(NamedTuple): + """Calculated statistics for a period.""" + + aggregated_level: str | None + aggregated_rating: str | None + rating_difference_pct: float | None + price_avg: float + price_min: float + price_max: float + price_spread: float + volatility: str + period_price_diff: float | None + period_price_diff_pct: float | None + + +class ThresholdConfig(NamedTuple): + """Threshold configuration for period calculations.""" + + threshold_low: float | None + threshold_high: float | None + threshold_volatility_moderate: float + threshold_volatility_high: float + threshold_volatility_very_high: float + reverse_sort: bool def calculate_periods( @@ -117,11 +166,19 @@ def calculate_periods( # Step 8: Extract lightweight period summaries (no full price data) # Note: Filtering for current/future is done here based on end date, # not start date. This preserves periods that started yesterday but end today. + thresholds = ThresholdConfig( + threshold_low=threshold_low, + threshold_high=threshold_high, + threshold_volatility_moderate=config.threshold_volatility_moderate, + threshold_volatility_high=config.threshold_volatility_high, + threshold_volatility_very_high=config.threshold_volatility_very_high, + reverse_sort=reverse_sort, + ) period_summaries = _extract_period_summaries( raw_periods, all_prices_sorted, - threshold_low=threshold_low, - threshold_high=threshold_high, + price_context, + thresholds, ) return { @@ -350,30 +407,183 @@ def _filter_periods_by_end_date(periods: list[list[dict]]) -> list[list[dict]]: return filtered +def _calculate_period_price_diff( + price_avg: float, + start_time: datetime, + price_context: dict[str, Any], +) -> tuple[float | None, float | None]: + """ + Calculate period price difference from daily reference (min or max). + + Uses reference price from start day of the period for consistency. + + Returns: + Tuple of (period_price_diff, period_price_diff_pct) or (None, None) if no reference available. + + """ + if not price_context or not start_time: + return None, None + + ref_prices = price_context.get("ref_prices", {}) + date_key = start_time.date() + ref_price = ref_prices.get(date_key) + + if ref_price is None: + return None, None + + # Convert reference price to minor units (ct/øre) + ref_price_minor = round(ref_price * 100, 2) + period_price_diff = round(price_avg - ref_price_minor, 2) + period_price_diff_pct = None + if ref_price_minor != 0: + period_price_diff_pct = round((period_price_diff / ref_price_minor) * 100, 2) + + return period_price_diff, period_price_diff_pct + + +def _calculate_aggregated_rating_difference(period_price_data: list[dict]) -> float | None: + """ + Calculate aggregated rating difference percentage for the period. + + Takes the average of all interval differences (from their respective thresholds). + + Args: + period_price_data: List of price data dictionaries with "difference" field + + Returns: + Average difference percentage, or None if no valid data + + """ + differences = [] + for price_data in period_price_data: + diff = price_data.get("difference") + if diff is not None: + differences.append(float(diff)) + + if not differences: + return None + + return round(sum(differences) / len(differences), 2) + + +def _calculate_period_price_statistics(period_price_data: list[dict]) -> dict[str, float]: + """ + Calculate price statistics for a period. + + Args: + period_price_data: List of price data dictionaries with "total" field + + Returns: + Dictionary with price_avg, price_min, price_max, price_spread (all in minor units: ct/øre) + + """ + prices_minor = [round(float(p["total"]) * 100, 2) for p in period_price_data] + + if not prices_minor: + return { + "price_avg": 0.0, + "price_min": 0.0, + "price_max": 0.0, + "price_spread": 0.0, + } + + price_avg = round(sum(prices_minor) / len(prices_minor), 2) + price_min = round(min(prices_minor), 2) + price_max = round(max(prices_minor), 2) + price_spread = round(price_max - price_min, 2) + + return { + "price_avg": price_avg, + "price_min": price_min, + "price_max": price_max, + "price_spread": price_spread, + } + + +def _build_period_summary_dict( + period_data: PeriodData, + stats: PeriodStatistics, + *, + reverse_sort: bool, +) -> dict: + """ + Build the complete period summary dictionary. + + Args: + period_data: Period timing and position data + stats: Calculated period statistics + reverse_sort: True for peak price, False for best price (keyword-only) + + Returns: + Complete period summary dictionary following attribute ordering + + """ + # Build complete period summary (following attribute ordering from copilot-instructions.md) + summary = { + # 1. Time information (when does this apply?) + "start": period_data.start_time, + "end": period_data.end_time, + "duration_minutes": period_data.period_length * MINUTES_PER_INTERVAL, + # 2. Core decision attributes (what should I do?) + "level": stats.aggregated_level, + "rating_level": stats.aggregated_rating, + "rating_difference_%": stats.rating_difference_pct, + # 3. Price statistics (how much does it cost?) + "price_avg": stats.price_avg, + "price_min": stats.price_min, + "price_max": stats.price_max, + "price_spread": stats.price_spread, + "volatility": stats.volatility, + # 4. Price differences will be added below if available + # 5. Detail information (additional context) + "period_interval_count": period_data.period_length, + "period_position": period_data.period_idx, + # 6. Meta information (technical details) + "periods_total": period_data.total_periods, + "periods_remaining": period_data.total_periods - period_data.period_idx, + } + + # Add period price difference attributes based on sensor type (step 4) + if stats.period_price_diff is not None: + if reverse_sort: + # Peak price sensor: compare to daily maximum + summary["period_price_diff_from_daily_max"] = stats.period_price_diff + if stats.period_price_diff_pct is not None: + summary["period_price_diff_from_daily_max_%"] = stats.period_price_diff_pct + else: + # Best price sensor: compare to daily minimum + summary["period_price_diff_from_daily_min"] = stats.period_price_diff + if stats.period_price_diff_pct is not None: + summary["period_price_diff_from_daily_min_%"] = stats.period_price_diff_pct + + return summary + + def _extract_period_summaries( periods: list[list[dict]], all_prices: list[dict], - *, - threshold_low: float | None, - threshold_high: float | None, + price_context: dict[str, Any], + thresholds: ThresholdConfig, ) -> list[dict]: """ - Extract lightweight period summaries without storing full price data. + Extract complete period summaries with all aggregated attributes. - Returns minimal information needed to identify periods: - - start/end timestamps - - interval count - - duration - - aggregated level (from API's "level" field) - - aggregated rating_level (from calculated "rating_level" field) + Returns sensor-ready period summaries with: + - Timestamps and positioning (start, end, hour, minute, time) + - Aggregated price statistics (price_avg, price_min, price_max, price_spread) + - Volatility categorization (low/moderate/high/very_high based on absolute spread) + - Rating difference percentage (aggregated from intervals) + - Period price differences (period_price_diff_from_daily_min/max) + - Aggregated level and rating_level + - Interval count (number of 15-min intervals in period) - Sensors can use these summaries to query the actual price data from priceInfo on demand. + All data is pre-calculated and ready for display - no further processing needed. Args: periods: List of periods, where each period is a list of interval dictionaries all_prices: All price data from the API (enriched with level, difference, rating_level) - threshold_low: Low threshold for rating level calculation - threshold_high: High threshold for rating level calculation + price_context: Dictionary with ref_prices and avg_prices per day + thresholds: Threshold configuration for calculations """ # Build lookup dictionary for full price data by timestamp @@ -385,8 +595,9 @@ def _extract_period_summaries( price_lookup[starts_at.isoformat()] = price_data summaries = [] + total_periods = len(periods) - for period in periods: + for period_idx, period in enumerate(periods, 1): if not period: continue @@ -399,14 +610,13 @@ def _extract_period_summaries( if not start_time or not end_time: continue - # Collect interval timestamps - interval_starts = [ - start.isoformat() for interval in period if (start := interval.get("interval_start")) is not None - ] - # Look up full price data for each interval in the period period_price_data: list[dict] = [] - for start_iso in interval_starts: + for interval in period: + start = interval.get("interval_start") + if not start: + continue + start_iso = start.isoformat() price_data = price_lookup.get(start_iso) if price_data: period_price_data.append(price_data) @@ -420,25 +630,54 @@ def _extract_period_summaries( aggregated_level = aggregate_period_levels(period_price_data) # Aggregate rating_level (from calculated "rating_level" and "difference" fields) - if threshold_low is not None and threshold_high is not None: + if thresholds.threshold_low is not None and thresholds.threshold_high is not None: aggregated_rating, _ = aggregate_period_ratings( period_price_data, - threshold_low, - threshold_high, + thresholds.threshold_low, + thresholds.threshold_high, ) - summary = { - "start": start_time, - "end": end_time, - "interval_count": len(period), - "duration_minutes": len(period) * MINUTES_PER_INTERVAL, - # Store interval timestamps for reference (minimal data) - "interval_starts": interval_starts, - # Aggregated attributes - "level": aggregated_level, - "rating_level": aggregated_rating, - } + # Calculate price statistics (in minor units: ct/øre) + price_stats = _calculate_period_price_statistics(period_price_data) + # Calculate period price difference from daily reference + period_price_diff, period_price_diff_pct = _calculate_period_price_diff( + price_stats["price_avg"], start_time, price_context + ) + + # Calculate volatility (categorical) and aggregated rating difference (numeric) + volatility = calculate_volatility_level( + price_stats["price_spread"], + threshold_moderate=thresholds.threshold_volatility_moderate, + threshold_high=thresholds.threshold_volatility_high, + threshold_very_high=thresholds.threshold_volatility_very_high, + ).lower() + rating_difference_pct = _calculate_aggregated_rating_difference(period_price_data) + + # Build period data and statistics objects + period_data = PeriodData( + start_time=start_time, + end_time=end_time, + period_length=len(period), + period_idx=period_idx, + total_periods=total_periods, + ) + + stats = PeriodStatistics( + aggregated_level=aggregated_level, + aggregated_rating=aggregated_rating, + rating_difference_pct=rating_difference_pct, + price_avg=price_stats["price_avg"], + price_min=price_stats["price_min"], + price_max=price_stats["price_max"], + price_spread=price_stats["price_spread"], + volatility=volatility, + period_price_diff=period_price_diff, + period_price_diff_pct=period_price_diff_pct, + ) + + # Build complete period summary + summary = _build_period_summary_dict(period_data, stats, reverse_sort=thresholds.reverse_sort) summaries.append(summary) return summaries diff --git a/custom_components/tibber_prices/price_utils.py b/custom_components/tibber_prices/price_utils.py index 4801fab..8e78a29 100644 --- a/custom_components/tibber_prices/price_utils.py +++ b/custom_components/tibber_prices/price_utils.py @@ -8,13 +8,66 @@ from typing import Any from homeassistant.util import dt as dt_util -from .const import PRICE_LEVEL_MAPPING, PRICE_LEVEL_NORMAL, PRICE_RATING_NORMAL +from .const import ( + DEFAULT_VOLATILITY_THRESHOLD_HIGH, + DEFAULT_VOLATILITY_THRESHOLD_MODERATE, + DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, + PRICE_LEVEL_MAPPING, + PRICE_LEVEL_NORMAL, + PRICE_RATING_NORMAL, + VOLATILITY_HIGH, + VOLATILITY_LOW, + VOLATILITY_MODERATE, + VOLATILITY_VERY_HIGH, +) _LOGGER = logging.getLogger(__name__) MINUTES_PER_INTERVAL = 15 +def calculate_volatility_level( + spread: float, + threshold_moderate: float | None = None, + threshold_high: float | None = None, + threshold_very_high: float | None = None, +) -> str: + """ + Calculate volatility level from price spread. + + Volatility indicates how much prices fluctuate during a period, which helps + determine whether active load shifting is worthwhile. + + Args: + spread: Absolute price difference between max and min (in minor currency units, e.g., ct or øre) + threshold_moderate: Custom threshold for MODERATE level (default: use DEFAULT_VOLATILITY_THRESHOLD_MODERATE) + threshold_high: Custom threshold for HIGH level (default: use DEFAULT_VOLATILITY_THRESHOLD_HIGH) + threshold_very_high: Custom threshold for VERY_HIGH level (default: use DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH) + + Returns: + Volatility level: "LOW", "MODERATE", "HIGH", or "VERY_HIGH" (uppercase) + + Examples: + - spread < 5: LOW → minimal optimization potential + - 5 ≤ spread < 15: MODERATE → some optimization worthwhile + - 15 ≤ spread < 30: HIGH → strong optimization recommended + - spread ≥ 30: VERY_HIGH → maximum optimization potential + + """ + # Use provided thresholds or fall back to constants + t_moderate = threshold_moderate if threshold_moderate is not None else DEFAULT_VOLATILITY_THRESHOLD_MODERATE + t_high = threshold_high if threshold_high is not None else DEFAULT_VOLATILITY_THRESHOLD_HIGH + t_very_high = threshold_very_high if threshold_very_high is not None else DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH + + if spread < t_moderate: + return VOLATILITY_LOW + if spread < t_high: + return VOLATILITY_MODERATE + if spread < t_very_high: + return VOLATILITY_HIGH + return VOLATILITY_VERY_HIGH + + def calculate_trailing_average_for_interval( interval_start: datetime, all_prices: list[dict[str, Any]], diff --git a/custom_components/tibber_prices/sensor.py b/custom_components/tibber_prices/sensor.py index c9842a3..0e2e003 100644 --- a/custom_components/tibber_prices/sensor.py +++ b/custom_components/tibber_prices/sensor.py @@ -40,7 +40,6 @@ from .const import ( PRICE_LEVEL_MAPPING, PRICE_RATING_MAPPING, async_get_entity_description, - calculate_volatility_level, format_price_unit_minor, get_entity_description, get_price_level_translation, @@ -52,6 +51,7 @@ from .price_utils import ( aggregate_price_levels, aggregate_price_rating, calculate_price_trend, + calculate_volatility_level, find_price_data_for_interval, ) diff --git a/custom_components/tibber_prices/services.py b/custom_components/tibber_prices/services.py index cfabbcb..b00cd56 100644 --- a/custom_components/tibber_prices/services.py +++ b/custom_components/tibber_prices/services.py @@ -35,9 +35,9 @@ from .const import ( PRICE_RATING_HIGH, PRICE_RATING_LOW, PRICE_RATING_NORMAL, - calculate_volatility_level, get_price_level_translation, ) +from .price_utils import calculate_volatility_level PRICE_SERVICE_NAME = "get_price" APEXCHARTS_DATA_SERVICE_NAME = "get_apexcharts_data"