diff --git a/custom_components/tibber_prices/coordinator/period_handlers/core.py b/custom_components/tibber_prices/coordinator/period_handlers/core.py index bc4aeb7..caaa365 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/core.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/core.py @@ -16,8 +16,10 @@ from .period_building import ( add_interval_ends, build_periods, calculate_reference_prices, + extend_periods_across_midnight, filter_periods_by_end_date, filter_periods_by_min_length, + filter_superseded_periods, split_intervals_by_day, ) from .period_statistics import ( @@ -188,7 +190,7 @@ def calculate_periods( # Sensors filter further for today+tomorrow, services can access all cached periods raw_periods = filter_periods_by_end_date(raw_periods, time=time) - # Step 8: Extract lightweight period summaries (no full price data) + # Step 7: Extract lightweight period summaries (no full price data) # Note: Periods are filtered by end date to keep yesterday/today/tomorrow. # This preserves periods that started day-before-yesterday but end yesterday. thresholds = TibberPricesThresholdConfig( @@ -207,6 +209,26 @@ def calculate_periods( time=time, ) + # Step 8: Cross-day extension for late-night periods + # If a best-price period ends near midnight and tomorrow has continued low prices, + # extend the period across midnight to give users the full cheap window + period_summaries = extend_periods_across_midnight( + period_summaries, + all_prices_sorted, + price_context, + time=time, + reverse_sort=reverse_sort, + ) + + # Step 9: Filter superseded periods + # When tomorrow data is available, late-night today periods that were found via + # relaxation may be obsolete if tomorrow has significantly better alternatives + period_summaries = filter_superseded_periods( + period_summaries, + time=time, + reverse_sort=reverse_sort, + ) + return { "periods": period_summaries, # Lightweight summaries only "metadata": { diff --git a/custom_components/tibber_prices/coordinator/period_handlers/period_building.py b/custom_components/tibber_prices/coordinator/period_handlers/period_building.py index a8468bf..e2f4fa3 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/period_building.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/period_building.py @@ -3,13 +3,12 @@ from __future__ import annotations import logging +from datetime import date, datetime, timedelta from typing import TYPE_CHECKING, Any from custom_components.tibber_prices.const import PRICE_LEVEL_MAPPING if TYPE_CHECKING: - from datetime import date - from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService from .level_filtering import ( @@ -281,3 +280,428 @@ def filter_periods_by_end_date(periods: list[list[dict]], *, time: TibberPricesT filtered.append(period) return filtered + + +def _categorize_periods_for_supersession( + period_summaries: list[dict], + today: date, + tomorrow: date, + late_hour_threshold: int, + early_hour_limit: int, +) -> tuple[list[dict], list[dict], list[dict]]: + """Categorize periods into today-late, tomorrow-early, and other.""" + today_late: list[dict] = [] + tomorrow_early: list[dict] = [] + other: list[dict] = [] + + for period in period_summaries: + period_start = period.get("start") + period_end = period.get("end") + + if not period_start or not period_end: + other.append(period) + # Today late-night periods: START today at or after late_hour_threshold (e.g., 20:00) + # Note: period_end could be tomorrow (e.g., 23:30-00:00 spans midnight) + elif period_start.date() == today and period_start.hour >= late_hour_threshold: + today_late.append(period) + # Tomorrow early-morning periods: START tomorrow before early_hour_limit (e.g., 08:00) + elif period_start.date() == tomorrow and period_start.hour < early_hour_limit: + tomorrow_early.append(period) + else: + other.append(period) + + return today_late, tomorrow_early, other + + +def _filter_superseded_today_periods( + today_late_periods: list[dict], + best_tomorrow: dict, + best_tomorrow_price: float, + improvement_threshold: float, +) -> list[dict]: + """Filter today periods that are superseded by a better tomorrow period.""" + kept: list[dict] = [] + + for today_period in today_late_periods: + today_price = today_period.get("price_mean") + + if today_price is None: + kept.append(today_period) + continue + + # Calculate how much better tomorrow is (as percentage) + improvement_pct = ((today_price - best_tomorrow_price) / today_price * 100) if today_price > 0 else 0 + + _LOGGER.debug( + "Supersession check: Today %s-%s (%.4f) vs Tomorrow %s-%s (%.4f) = %.1f%% improvement (threshold: %.1f%%)", + today_period["start"].strftime("%H:%M"), + today_period["end"].strftime("%H:%M"), + today_price, + best_tomorrow["start"].strftime("%H:%M"), + best_tomorrow["end"].strftime("%H:%M"), + best_tomorrow_price, + improvement_pct, + improvement_threshold, + ) + + if improvement_pct >= improvement_threshold: + _LOGGER.info( + "Period superseded: Today %s-%s (%.2f) replaced by Tomorrow %s-%s (%.2f, %.1f%% better)", + today_period["start"].strftime("%H:%M"), + today_period["end"].strftime("%H:%M"), + today_price, + best_tomorrow["start"].strftime("%H:%M"), + best_tomorrow["end"].strftime("%H:%M"), + best_tomorrow_price, + improvement_pct, + ) + else: + kept.append(today_period) + + return kept + + +def filter_superseded_periods( + period_summaries: list[dict], + *, + time: TibberPricesTimeService, + reverse_sort: bool, +) -> list[dict]: + """ + Filter out late-night today periods that are superseded by better tomorrow periods. + + When tomorrow's data becomes available, some late-night periods that were found + through relaxation may no longer make sense. If tomorrow has a significantly + better period in the early morning, the late-night today period is obsolete. + + Example: + - Today 23:30-00:00 at 0.70 kr (found via relaxation, was best available) + - Tomorrow 04:00-05:30 at 0.50 kr (much better alternative) + → The today period is superseded and should be filtered out + + This only applies to best-price periods (reverse_sort=False). + Peak-price periods are not filtered this way. + + """ + from .types import ( # noqa: PLC0415 + CROSS_DAY_LATE_PERIOD_START_HOUR, + CROSS_DAY_MAX_EXTENSION_HOUR, + SUPERSESSION_PRICE_IMPROVEMENT_PCT, + ) + + _LOGGER.debug( + "filter_superseded_periods called: %d periods, reverse_sort=%s", + len(period_summaries) if period_summaries else 0, + reverse_sort, + ) + + # Only filter for best-price periods + if reverse_sort or not period_summaries: + return period_summaries + + now = time.now() + today = now.date() + tomorrow = today + timedelta(days=1) + + # Categorize periods + today_late, tomorrow_early, other = _categorize_periods_for_supersession( + period_summaries, + today, + tomorrow, + CROSS_DAY_LATE_PERIOD_START_HOUR, + CROSS_DAY_MAX_EXTENSION_HOUR, + ) + + _LOGGER.debug( + "Supersession categorization: today_late=%d, tomorrow_early=%d, other=%d", + len(today_late), + len(tomorrow_early), + len(other), + ) + + # If no tomorrow early periods, nothing to compare against + if not tomorrow_early: + _LOGGER.debug("No tomorrow early periods - skipping supersession check") + return period_summaries + + # Find the best tomorrow early period (lowest mean price) + best_tomorrow = min(tomorrow_early, key=lambda p: p.get("price_mean", float("inf"))) + best_tomorrow_price = best_tomorrow.get("price_mean") + + if best_tomorrow_price is None: + return period_summaries + + # Filter superseded today periods + kept_today = _filter_superseded_today_periods( + today_late, + best_tomorrow, + best_tomorrow_price, + SUPERSESSION_PRICE_IMPROVEMENT_PCT, + ) + + # Reconstruct and sort by start time + result = other + kept_today + tomorrow_early + result.sort(key=lambda p: p.get("start") or time.now()) + + return result + + +def _is_period_eligible_for_extension( + period: dict, + today: date, + late_hour_threshold: int, +) -> bool: + """ + Check if a period is eligible for cross-day extension. + + Eligibility criteria: + - Period has valid start and end times + - Period ends on today (not yesterday or tomorrow) + - Period ends late (after late_hour_threshold, e.g. 20:00) + + """ + period_end = period.get("end") + period_start = period.get("start") + + if not period_end or not period_start: + return False + + if period_end.date() != today: + return False + + return period_end.hour >= late_hour_threshold + + +def _find_extension_intervals( + period_end: datetime, + price_lookup: dict[str, dict], + criteria: Any, + max_extension_time: datetime, + interval_duration: timedelta, +) -> list[dict]: + """ + Find consecutive intervals after period_end that meet criteria. + + Iterates forward from period_end, adding intervals while they + meet the flex and min_distance criteria. Stops at first failure + or when reaching max_extension_time. + + """ + from .level_filtering import check_interval_criteria # noqa: PLC0415 + + extension_intervals: list[dict] = [] + check_time = period_end + + while check_time < max_extension_time: + price_data = price_lookup.get(check_time.isoformat()) + if not price_data: + break # No more data + + price = float(price_data["total"]) + in_flex, meets_min_distance = check_interval_criteria(price, criteria) + + if not (in_flex and meets_min_distance): + break # Criteria no longer met + + extension_intervals.append(price_data) + check_time = check_time + interval_duration + + return extension_intervals + + +def _collect_original_period_prices( + period_start: datetime, + period_end: datetime, + price_lookup: dict[str, dict], + interval_duration: timedelta, +) -> list[float]: + """Collect prices from original period for CV calculation.""" + prices: list[float] = [] + current = period_start + while current < period_end: + price_data = price_lookup.get(current.isoformat()) + if price_data: + prices.append(float(price_data["total"])) + current = current + interval_duration + return prices + + +def _build_extended_period( + period: dict, + extension_intervals: list[dict], + combined_prices: list[float], + combined_cv: float, + interval_duration: timedelta, +) -> dict: + """Create extended period dict with updated statistics.""" + period_start = period["start"] + period_end = period["end"] + new_end = period_end + (interval_duration * len(extension_intervals)) + + extended = period.copy() + extended["end"] = new_end + extended["duration_minutes"] = int((new_end - period_start).total_seconds() / 60) + extended["period_interval_count"] = len(combined_prices) + extended["cross_day_extended"] = True + extended["cross_day_extension_intervals"] = len(extension_intervals) + + # Recalculate price statistics + extended["price_min"] = min(combined_prices) + extended["price_max"] = max(combined_prices) + extended["price_mean"] = sum(combined_prices) / len(combined_prices) + extended["price_spread"] = extended["price_max"] - extended["price_min"] + extended["coefficient_of_variation"] = round(combined_cv, 1) + + return extended + + +def extend_periods_across_midnight( + period_summaries: list[dict], + all_prices: list[dict], + price_context: dict[str, Any], + *, + time: TibberPricesTimeService, + reverse_sort: bool, +) -> list[dict]: + """ + Extend late-night periods across midnight if favorable prices continue. + + When a period ends close to midnight and tomorrow's data shows continued + favorable prices, extend the period into the next day. This prevents + artificial period breaks at midnight when it's actually better to continue. + + Example: Best price period 22:00-23:45 today could extend to 04:00 tomorrow + if prices remain low overnight. + + Rules: + - Only extends periods ending after CROSS_DAY_LATE_PERIOD_START_HOUR (20:00) + - Won't extend beyond CROSS_DAY_MAX_EXTENSION_HOUR (08:00) next day + - Extension must pass same flex criteria as original period + - Quality Gate (CV check) applies to extended period + + Args: + period_summaries: List of period summary dicts (already processed) + all_prices: All price intervals including tomorrow + price_context: Dict with ref_prices, avg_prices, flex, min_distance_from_avg + time: Time service instance + reverse_sort: True for peak price, False for best price + + Returns: + Updated list of period summaries with extensions applied + + """ + from custom_components.tibber_prices.utils.price import calculate_coefficient_of_variation # noqa: PLC0415 + + from .types import ( # noqa: PLC0415 + CROSS_DAY_LATE_PERIOD_START_HOUR, + CROSS_DAY_MAX_EXTENSION_HOUR, + PERIOD_MAX_CV, + TibberPricesIntervalCriteria, + ) + + if not period_summaries or not all_prices: + return period_summaries + + # Build price lookup by timestamp + price_lookup: dict[str, dict] = {} + for price_data in all_prices: + interval_time = time.get_interval_time(price_data) + if interval_time: + price_lookup[interval_time.isoformat()] = price_data + + ref_prices = price_context.get("ref_prices", {}) + avg_prices = price_context.get("avg_prices", {}) + flex = price_context.get("flex", 0.15) + min_distance = price_context.get("min_distance_from_avg", 0) + + now = time.now() + today = now.date() + tomorrow = today + timedelta(days=1) + interval_duration = time.get_interval_duration() + + # Max extension time (e.g., 08:00 tomorrow) + max_extension_time = time.start_of_local_day(now) + timedelta(days=1, hours=CROSS_DAY_MAX_EXTENSION_HOUR) + + extended_summaries = [] + + for period in period_summaries: + # Check eligibility for extension + if not _is_period_eligible_for_extension(period, today, CROSS_DAY_LATE_PERIOD_START_HOUR): + extended_summaries.append(period) + continue + + # Get tomorrow's reference prices + tomorrow_ref = ref_prices.get(tomorrow) or ref_prices.get(str(tomorrow)) + tomorrow_avg = avg_prices.get(tomorrow) or avg_prices.get(str(tomorrow)) + + if tomorrow_ref is None or tomorrow_avg is None: + extended_summaries.append(period) + continue + + # Set up criteria for extension check + criteria = TibberPricesIntervalCriteria( + ref_price=tomorrow_ref, + avg_price=tomorrow_avg, + flex=flex, + min_distance_from_avg=min_distance, + reverse_sort=reverse_sort, + ) + + # Find extension intervals + extension_intervals = _find_extension_intervals( + period["end"], + price_lookup, + criteria, + max_extension_time, + interval_duration, + ) + + if not extension_intervals: + extended_summaries.append(period) + continue + + # Collect all prices for CV check + original_prices = _collect_original_period_prices( + period["start"], + period["end"], + price_lookup, + interval_duration, + ) + extension_prices = [float(p["total"]) for p in extension_intervals] + combined_prices = original_prices + extension_prices + + # Quality Gate: Check CV of extended period + combined_cv = calculate_coefficient_of_variation(combined_prices) + + if combined_cv is not None and combined_cv <= PERIOD_MAX_CV: + # Extension passes quality gate + extended_period = _build_extended_period( + period, + extension_intervals, + combined_prices, + combined_cv, + interval_duration, + ) + + _LOGGER.info( + "Cross-day extension: Period %s-%s extended to %s (+%d intervals, CV=%.1f%%)", + period["start"].strftime("%H:%M"), + period["end"].strftime("%H:%M"), + extended_period["end"].strftime("%H:%M"), + len(extension_intervals), + combined_cv, + ) + extended_summaries.append(extended_period) + else: + # Extension would exceed quality gate + _LOGGER_DETAILS.debug( + "%sCross-day extension rejected for period %s-%s: CV=%.1f%% > %.1f%%", + INDENT_L0, + period["start"].strftime("%H:%M"), + period["end"].strftime("%H:%M"), + combined_cv or 0, + PERIOD_MAX_CV, + ) + extended_summaries.append(period) + + return extended_summaries