diff --git a/custom_components/tibber_prices/coordinator/period_handlers/core.py b/custom_components/tibber_prices/coordinator/period_handlers/core.py index 55f0d68..6087149 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/core.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/core.py @@ -5,6 +5,8 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any if TYPE_CHECKING: + from datetime import datetime + from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService from .types import TibberPricesPeriodConfig @@ -31,6 +33,7 @@ from .types import TibberPricesThresholdConfig # Flex limits to prevent degenerate behavior (see docs/development/period-calculation-theory.md) MAX_SAFE_FLEX = 0.50 # 50% - hard cap: above this, period detection becomes unreliable MAX_OUTLIER_FLEX = 0.25 # 25% - cap for outlier filtering: above this, spike detection too permissive +MIN_SEGMENT_FORCING_INTERVALS = 8 # Minimum intervals per day half to attempt segment forcing (< 2 hours is too few) def calculate_periods( @@ -39,6 +42,7 @@ def calculate_periods( config: TibberPricesPeriodConfig, time: TibberPricesTimeService, day_patterns_by_date: dict | None = None, + time_range: tuple[datetime, datetime] | None = None, ) -> dict[str, Any]: """ Calculate price periods (best or peak) from price data. @@ -61,6 +65,9 @@ def calculate_periods( min_period_length, threshold_low, and threshold_high. time: TibberPricesTimeService instance (required). day_patterns_by_date: Optional dict mapping date → day pattern dict for geometric flex bonus. + time_range: Optional (start_inclusive, end_exclusive) window passed through to + build_periods(). When set, only intervals within [start, end) are considered + as period candidates. Used by Phase 4 segment forcing. Returns: Dict with: @@ -168,6 +175,7 @@ def calculate_periods( level_filter=config.level_filter, gap_count=config.gap_count, time=time, + time_range=time_range, ) _LOGGER.debug( @@ -178,6 +186,24 @@ def calculate_periods( config.level_filter or "None", ) + # Step 3.5: Segment forcing for W/M-shaped days (opt-in, default disabled) + # For days detected as W-shape (DOUBLE_VALLEY for best) or M-shape (DOUBLE_PEAK for peak), + # ensures each price valley/peak segment has at least segment_min_periods periods. + if config.segment_forcing and day_patterns_by_date: + raw_periods = _apply_segment_forcing( + all_prices_smoothed, + raw_periods, + price_context, + config, + day_patterns_by_date=day_patterns_by_date, + time=time, + ) + _LOGGER.debug( + "%sAfter segment_forcing: %d periods total", + INDENT_L0, + len(raw_periods), + ) + # Step 4: Filter by minimum length raw_periods = filter_periods_by_min_length(raw_periods, min_period_length, time=time) _LOGGER.debug( @@ -264,3 +290,168 @@ def calculate_periods( "avg_prices": {k.isoformat(): v for k, v in avg_price_by_day.items()}, }, } + + +# ─── Segment forcing helpers ────────────────────────────────────────────────── + + +def _period_belongs_to_side( + period: list[dict], + side_times: set, + time: "TibberPricesTimeService", +) -> bool: + """Return True if the majority of a period's intervals are in side_times.""" + if not period: + return False + in_side = sum(1 for iv in period if time.get_interval_time(iv) in side_times) + return in_side * 2 >= len(period) + + +def _apply_segment_forcing( # noqa: PLR0913 + all_prices_smoothed: list[dict], + periods: list[list[dict]], + price_context: dict[str, Any], + config: "TibberPricesPeriodConfig", + *, + day_patterns_by_date: dict, + time: "TibberPricesTimeService", +) -> list[list[dict]]: + """ + Force at least segment_min_periods periods per segment for W/M-shaped days. + + For DOUBLE_VALLEY days (best price): splits at the central price peak and + ensures each valley side has the required number of periods. + For DOUBLE_PEAK days (peak price): splits at the central price valley and + ensures each peak side has the required number of periods. + + Args: + all_prices_smoothed: Outlier-filtered prices used for period building. + periods: Already-found periods from the global build_periods call. + price_context: Context dict with reference/average prices + filter settings. + config: Period configuration including segment_forcing parameters. + day_patterns_by_date: Detected day patterns keyed by date. + time: TibberPricesTimeService instance. + + Returns: + Updated periods list with any new segment-forced periods appended. + + """ + import logging # noqa: PLC0415 + + from .period_building import build_periods # noqa: PLC0415 + from .types import DAY_PATTERN_DOUBLE_PEAK, DAY_PATTERN_DOUBLE_VALLEY, INDENT_L1, INDENT_L2 # noqa: PLC0415 + + _LOGGER = logging.getLogger(__name__) # noqa: N806 + + reverse_sort = config.reverse_sort + target_pattern = DAY_PATTERN_DOUBLE_PEAK if reverse_sort else DAY_PATTERN_DOUBLE_VALLEY + segment_min_periods = config.segment_min_periods + + merged_periods = list(periods) + + for day_date, day_pattern in day_patterns_by_date.items(): + if day_pattern is None or day_pattern.get("pattern") != target_pattern: + continue + + # Collect and sort this day's intervals + day_intervals = sorted( + ( + iv + for iv in all_prices_smoothed + if (t := time.get_interval_time(iv)) is not None and t.date() == day_date + ), + key=time.get_interval_time, # type: ignore[arg-type] + ) + if len(day_intervals) < MIN_SEGMENT_FORCING_INTERVALS: # need at least a few intervals per segment + continue + + # Find the central extremum in the middle 50% of the day + # DOUBLE_VALLEY → central peak = highest price between the two valleys + # DOUBLE_PEAK → central valley = lowest price between the two peaks + n = len(day_intervals) + middle = day_intervals[n // 4 : 3 * n // 4] + if not middle: + continue + + if not reverse_sort: + split_iv = max(middle, key=lambda iv: iv.get("total") or 0) + else: + split_iv = min(middle, key=lambda iv: iv.get("total") or float("inf")) + + split_time = time.get_interval_time(split_iv) + if split_time is None: + continue + + side_a = [iv for iv in day_intervals if (t := time.get_interval_time(iv)) is not None and t <= split_time] + side_b = [iv for iv in day_intervals if (t := time.get_interval_time(iv)) is not None and t > split_time] + + _LOGGER.debug( + "%sSegment forcing %s (%s): split at %s (%d+%d intervals)", + INDENT_L1, + day_date, + target_pattern, + split_time.strftime("%H:%M"), + len(side_a), + len(side_b), + ) + + for side_name, side_intervals in (("A", side_a), ("B", side_b)): + side_times = {time.get_interval_time(iv) for iv in side_intervals} + count_in_side = sum(1 for p in merged_periods if _period_belongs_to_side(p, side_times, time)) + + _LOGGER.debug( + "%sSide %s: %d existing periods (need %d)", + INDENT_L2, + side_name, + count_in_side, + segment_min_periods, + ) + + if count_in_side >= segment_min_periods: + continue + + # Run period detection restricted to this segment side via time_range. + # The full all_prices_smoothed (including other days) is passed so that + # reference price context remains day-wide; time_range restricts which + # intervals are EVALUATED as period candidates to this side only. + sorted_side = sorted(side_intervals, key=time.get_interval_time) # type: ignore[arg-type] + side_start = time.get_interval_time(sorted_side[0]) + # end = one interval duration past the last interval's start + side_end = time.get_interval_time(sorted_side[-1]) + if side_start is None or side_end is None: + continue + side_end = side_end + time.get_interval_duration() + new_raw = build_periods( + all_prices_smoothed, + price_context, + reverse_sort=reverse_sort, + level_filter=config.level_filter, + gap_count=config.gap_count, + time=time, + time_range=(side_start, side_end), + ) + + # Add non-duplicate periods; flag them with segment_forced=True + added = 0 + for new_period in new_raw: + new_times = {time.get_interval_time(iv) for iv in new_period if time.get_interval_time(iv) is not None} + is_dup = any( + bool( + new_times + & {time.get_interval_time(iv) for iv in existing if time.get_interval_time(iv) is not None} + ) + for existing in merged_periods + ) + if not is_dup: + merged_periods.append([{**iv, "segment_forced": True} for iv in new_period]) + added += 1 + + _LOGGER.debug( + "%sSide %s: added %d forced periods (%d candidates from restricted run)", + INDENT_L2, + side_name, + added, + len(new_raw), + ) + + return merged_periods diff --git a/custom_components/tibber_prices/coordinator/period_handlers/period_building.py b/custom_components/tibber_prices/coordinator/period_handlers/period_building.py index fcb2720..9b828d6 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/period_building.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/period_building.py @@ -62,6 +62,7 @@ def build_periods( # noqa: PLR0913, PLR0915, PLR0912 - Complex period building level_filter: str | None = None, gap_count: int = 0, time: TibberPricesTimeService, + time_range: tuple[datetime, datetime] | None = None, ) -> list[list[dict]]: """ Build periods, allowing periods to cross midnight (day boundary). @@ -78,6 +79,10 @@ def build_periods( # noqa: PLR0913, PLR0915, PLR0912 - Complex period building level_filter: Level filter string ("cheap", "expensive", "any", None) gap_count: Number of allowed consecutive intervals deviating by exactly 1 level step time: TibberPricesTimeService instance (required) + time_range: Optional (start_inclusive, end_exclusive) window. When set, only intervals + within [start, end) are considered as period candidates. Reference prices + (from price_context) remain day-wide and are unaffected by this filter. + Used by Phase 4 segment forcing to restrict detection to one segment side. """ ref_prices = price_context["ref_prices"] @@ -132,6 +137,11 @@ def build_periods( # noqa: PLR0913, PLR0915, PLR0912 - Complex period building starts_at = time.get_interval_time(price_data) if starts_at is None: continue + + # Filter by time range if specified (Phase 4 segment forcing) + if time_range is not None and not (time_range[0] <= starts_at < time_range[1]): + continue + date_key = starts_at.date() # Use smoothed price for criteria checks (flex/distance) diff --git a/custom_components/tibber_prices/coordinator/period_handlers/period_statistics.py b/custom_components/tibber_prices/coordinator/period_handlers/period_statistics.py index bf05ae6..963af4c 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/period_statistics.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/period_statistics.py @@ -23,6 +23,8 @@ from custom_components.tibber_prices.utils.price import ( calculate_volatility_level, ) +from .types import LOW_PRICE_QUALITY_BYPASS_THRESHOLD, PERIOD_MAX_CV + def calculate_period_price_diff( price_mean: float, @@ -220,18 +222,57 @@ def build_period_summary_dict( return summary -def _add_interval_flag_counts(summary: dict, period: list[dict]) -> None: - """Add optional interval flag counts to period summary.""" +def _strip_geo_from_edges(period: list[dict]) -> list[dict]: + """ + Remove geo-bonus intervals from leading and trailing edges of a period. + + Used by Phase 3 CV gate: when a period with geometric extension fails the CV quality + gate, the edge intervals that were included only via geo-bonus flex are stripped to + restore the period's unextended (tighter) boundaries. + + Geo-bonus intervals in the MIDDLE of a period are preserved (they represent + intervals genuinely inside the valley/peak zone, not boundary extensions). + + Returns an empty list only when all intervals are geo-bonus (degenerate case). + """ + start = 0 + end = len(period) + while start < end and period[start].get("geometric_bonus_applied", False): + start += 1 + while end > start and period[end - 1].get("geometric_bonus_applied", False): + end -= 1 + return period[start:end] + + +def _add_interval_flag_counts(summary: dict, period: list[dict], *, geo_extension_status: str | None = None) -> None: + """ + Add optional interval flag counts to period summary. + + Args: + summary: Period summary dict to augment in-place. + period: Raw interval list (may already be stripped of geo-bonus edges). + geo_extension_status: "active" if geometric extension passed the CV gate, + "attempted" if it was tried but CV gate failed and period was reverted. + + """ if (count := sum(1 for i in period if i.get("smoothing_was_impactful", False))) > 0: summary["period_interval_smoothed_count"] = count if (count := sum(1 for i in period if i.get("is_level_gap", False))) > 0: summary["period_interval_level_gap_count"] = count - if (count := sum(1 for i in period if i.get("geometric_bonus_applied", False))) > 0: + # Geometric extension: distinguish "active" (CV passed) from "attempted" (CV failed → reverted) + if geo_extension_status == "active": + count = sum(1 for i in period if i.get("geometric_bonus_applied", False)) summary["geometric_extension_active"] = True summary["geometric_extension_intervals"] = count + elif geo_extension_status == "attempted": + # CV gate failed: geo extension was tried but period was reverted to base boundaries. + # The summary uses unextended (stripped) boundaries; this flag marks the attempt. + summary["geometric_extension_attempted"] = True + if any(i.get("segment_forced", False) for i in period): + summary["segment_forced"] = True -def extract_period_summaries( +def extract_period_summaries( # noqa: PLR0912, PLR0915 - CV pre-check for geo-extension adds necessary branches/statements periods: list[list[dict]], all_prices: list[dict], price_context: dict[str, Any], @@ -280,6 +321,34 @@ def extract_period_summaries( if not period: continue + # Phase 3: Geometric extension CV gate check + # If this period contains geo-bonus intervals, pre-check whether the full period + # passes the CV quality gate. If it fails, revert to base boundaries by stripping + # geo-bonus intervals from the edges and mark with geometric_extension_attempted. + geo_extension_status: str | None = None + if any(iv.get("geometric_bonus_applied", False) for iv in period): + full_prices: list[float] = [] + for iv in period: + start_iv = iv.get("interval_start") + if start_iv: + p = price_lookup.get(start_iv.isoformat()) + if p: + full_prices.append(float(p["total"])) + if full_prices: + full_cv = calculate_coefficient_of_variation(full_prices) + cv_fails = ( + full_cv is not None + and sum(full_prices) / len(full_prices) >= LOW_PRICE_QUALITY_BYPASS_THRESHOLD + and full_cv > PERIOD_MAX_CV + ) + if cv_fails: + base_period = _strip_geo_from_edges(period) + if base_period: + period = base_period # noqa: PLW2901 - intentional period replacement + geo_extension_status = "attempted" + else: + geo_extension_status = "active" + first_interval = period[0] last_interval = period[-1] @@ -369,7 +438,7 @@ def extract_period_summaries( ) # Add optional interval flag counts (smoothing, level gaps, geometric extension) - _add_interval_flag_counts(summary, period) + _add_interval_flag_counts(summary, period, geo_extension_status=geo_extension_status) summaries.append(summary) diff --git a/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py b/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py index b4b73bb..c1475f5 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py @@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from collections.abc import Callable - from datetime import date + from datetime import date, datetime from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService @@ -22,6 +22,7 @@ from .types import ( INDENT_L0, INDENT_L1, INDENT_L2, + LOW_PRICE_QUALITY_BYPASS_THRESHOLD, PERIOD_MAX_CV, TibberPricesPeriodConfig, ) @@ -41,12 +42,6 @@ FLEX_HIGH_THRESHOLD_RELAXATION = 0.30 # 30% - WARNING: base flex too high for r MIN_DURATION_FALLBACK_MINIMUM = 30 # Minimum period length to try (30 min = 2 intervals) MIN_DURATION_FALLBACK_STEP = 15 # Reduce by 15 min (1 interval) each step -# Low absolute price threshold for quality gate bypass (in major currency unit, e.g. EUR/NOK) -# When the MEAN price of a period is below this level, the CV quality gate is bypassed. -# Relative CV is unreliable at very low absolute prices: a range of 1-4 ct shows CV≈50% -# but is practically homogeneous from a cost perspective. -# Value: LOW_PRICE_AVG_THRESHOLD (subunit) / 100 = 10 ct / 100 = 0.10 EUR/NOK -LOW_PRICE_QUALITY_BYPASS_THRESHOLD = 0.10 # EUR/NOK major unit (= 10 ct/øre) # Span-to-ref ratio threshold for suppressing flex warnings on V-shape days. # When span / ref_price < this on ANY available day, the warning is shown. @@ -527,6 +522,7 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per- time: TibberPricesTimeService, config_entry: Any, # ConfigEntry type day_patterns_by_date: dict | None = None, + time_range: tuple[datetime, datetime] | None = None, ) -> dict[str, Any]: """ Calculate periods with optional per-day filter relaxation. @@ -555,6 +551,9 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per- config_entry: Config entry to get display unit configuration. day_patterns_by_date: Optional dict mapping date → day pattern dict. Used for geometric flex bonus in period detection. Passed through to calculate_periods(). + time_range: Optional (start_inclusive, end_exclusive) datetime window. When set, + only intervals within [start, end) are considered as period candidates. + Passed through to calculate_periods(). Used by Phase 4 segment forcing. Returns: Dict with same format as calculate_periods() output: @@ -712,7 +711,9 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per- # === BASELINE CALCULATION (process ALL prices together, including yesterday) === # Periods that ended before yesterday will be filtered out later by filter_periods_by_end_date() # This keeps yesterday/today/tomorrow periods in the cache - baseline_result = calculate_periods(all_prices, config=config, time=time, day_patterns_by_date=day_patterns_by_date) + baseline_result = calculate_periods( + all_prices, config=config, time=time, day_patterns_by_date=day_patterns_by_date, time_range=time_range + ) all_periods = baseline_result["periods"] # Count periods per day for min_periods check @@ -955,7 +956,10 @@ def relax_all_prices( # noqa: PLR0913 - Comprehensive filter relaxation require # Process ALL prices together (allows midnight crossing) result = calculate_periods( - all_prices, config=relaxed_config, time=time, day_patterns_by_date=day_patterns_by_date + all_prices, + config=relaxed_config, + time=time, + day_patterns_by_date=day_patterns_by_date, ) new_periods = result["periods"] diff --git a/custom_components/tibber_prices/coordinator/period_handlers/types.py b/custom_components/tibber_prices/coordinator/period_handlers/types.py index 72c2ad5..1ebf309 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/types.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/types.py @@ -22,6 +22,13 @@ from custom_components.tibber_prices.const import ( # Period with prices 0.5-1.0 kr has ~30% CV which would be rejected PERIOD_MAX_CV = 25.0 # 25% max coefficient of variation within a period +# Low absolute price threshold for quality gate bypass (in major currency unit, e.g. EUR/NOK) +# When the MEAN price of a period is below this level, the CV quality gate is bypassed. +# Relative CV is unreliable at very low absolute prices: a range of 1-4 ct shows CV≈50% +# but is practically homogeneous from a cost perspective. +# Value: 10 ct / 100 = 0.10 EUR/NOK +LOW_PRICE_QUALITY_BYPASS_THRESHOLD = 0.10 # EUR/NOK major unit (= 10 ct/øre) + # Cross-Day Extension: Time window constants # When a period ends late in the day and tomorrow data is available, # we can extend it past midnight if prices remain favorable @@ -59,6 +66,8 @@ class TibberPricesPeriodConfig(NamedTuple): extend_to_extreme: bool = False # Extend periods into adjacent VERY_CHEAP/VERY_EXPENSIVE intervals max_extension_intervals: int = 0 # Max intervals this extension may add per side (0 = disabled) geometric_extra_flex: float = 0.0 # Extra flex (decimal) for intervals inside the valley/peak zone (0.0 = disabled) + segment_forcing: bool = False # Force at least segment_min_periods in each W/M-shape segment + segment_min_periods: int = 1 # Minimum periods required per segment when segment_forcing is True class TibberPricesPeriodData(NamedTuple):