From ba3e127ac794b575306643c3c0e3fa13f5a6708b Mon Sep 17 00:00:00 2001 From: Julian Pawlowski Date: Fri, 17 Apr 2026 14:02:02 +0000 Subject: [PATCH] refactor(day_pattern): enhance pattern classification with price boundaries Refactor the pattern classification logic to include start and end prices for better accuracy in identifying day patterns. This change improves the classification of price patterns, particularly for cases involving valleys and peaks. Impact: Users will experience more accurate price pattern classifications, leading to better decision-making based on price trends. --- .../period_handlers/day_pattern.py | 80 +++++++++++--- .../period_handlers/level_filtering.py | 6 +- .../coordinator/period_handlers/relaxation.py | 7 ++ .../period_handlers/shape_extension.py | 100 ++++++++++++++++-- 4 files changed, 173 insertions(+), 20 deletions(-) diff --git a/custom_components/tibber_prices/coordinator/period_handlers/day_pattern.py b/custom_components/tibber_prices/coordinator/period_handlers/day_pattern.py index addcc4f..92325a3 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/day_pattern.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/day_pattern.py @@ -172,7 +172,13 @@ def _detect_single_day_pattern( extrema = _find_significant_extrema(smoothed, min_amplitude=price_span * MIN_EXTREMUM_AMPLITUDE_RATIO) # ── classify pattern ──────────────────────────────────────────────────────── - pattern, confidence = _classify_pattern(extrema, cv_pct, times) + pattern, confidence = _classify_pattern( + extrema, + cv_pct, + times, + start_price=smoothed[0], + end_price=smoothed[-1], + ) # ── knee points + primary extreme time ───────────────────────────────────── extreme_time: datetime | None = None @@ -208,6 +214,24 @@ def _detect_single_day_pattern( if max_extrema: primary = max(max_extrema, key=lambda e: e["price"]) extreme_time = times[primary["idx"]] if primary["idx"] < len(times) else None + # The valley between the two peaks is the cheap zone for best-price periods. + # Compute knee points around the deepest minimum so that compute_geometric_flex_bonus + # can apply extra flex to intervals in this zone (same mechanism as VALLEY). + min_extrema_dp = [e for e in extrema if e["type"] == "min"] + if min_extrema_dp: + valley_extreme = min(min_extrema_dp, key=lambda e: e["price"]) + lk, rk = _find_knee_points(smoothed, valley_extreme["idx"]) + valley_start = times[lk] if lk is not None and lk < len(times) else None + valley_end = times[rk] if rk is not None and rk < len(times) else None + # The valley between the two peaks is the cheap zone for best-price periods. + # Compute knee points around the deepest minimum so that compute_geometric_flex_bonus + # can apply extra flex to intervals inside this zone (same mechanism as VALLEY). + min_extrema_dp = [e for e in extrema if e["type"] == "min"] + if min_extrema_dp: + valley_extreme = min(min_extrema_dp, key=lambda e: e["price"]) + lk, rk = _find_knee_points(smoothed, valley_extreme["idx"]) + valley_start = times[lk] if lk is not None and lk < len(times) else None + valley_end = times[rk] if rk is not None and rk < len(times) else None # ── intra-day segments ────────────────────────────────────────────────────── segments = _detect_segments(extrema, prices_raw, times) @@ -278,24 +302,41 @@ def _find_significant_extrema( return [] # ── raw local extrema (strict local min/max) ──────────────────────────────── + # NOTE: We intentionally do NOT require the extremum to be below/above the + # day's start and end prices. That check was too restrictive for solar- + # influenced days (spring/summer) where overnight prices are as cheap as the + # midday valley, causing the midday dip to go undetected. The amplitude/ + # prominence filter below is sufficient to suppress noise. candidates: list[dict[str, Any]] = [] for i in range(1, n - 1): prev_p = smoothed[i - 1] cur_p = smoothed[i] next_p = smoothed[i + 1] - if cur_p <= prev_p and cur_p <= next_p and cur_p < smoothed[0] and cur_p < smoothed[-1]: + if cur_p <= prev_p and cur_p <= next_p: candidates.append({"idx": i, "type": "min", "price": cur_p}) - elif cur_p >= prev_p and cur_p >= next_p and cur_p > smoothed[0] and cur_p > smoothed[-1]: + elif cur_p >= prev_p and cur_p >= next_p: candidates.append({"idx": i, "type": "max", "price": cur_p}) if not candidates: return [] # ── amplitude filter ──────────────────────────────────────────────────────── - # For each candidate, compute prominence = distance to the nearest extremum - # of opposite type (or the global opposite extreme if none exist). - # We use a simpler heuristic: compare against the mean of its two flanking - # values in the smoothed series (one window radius on each side). + # For each candidate, measure prominence against the most representative + # reference price available. + # + # Problem with pure local-neighbourhood mean: a broad, flat-bottomed valley + # (e.g. a 5-hour cheap midday zone) pulls the neighbourhood mean down toward + # the valley price itself, making the prominence appear near-zero even though + # the valley is clearly significant on the full day. + # + # Solution: use max(local_mean, day_mean) for minima and min(local_mean, + # day_mean) for maxima. This picks the reference that gives the LARGEST + # separation for genuine extrema: + # - Deep/broad valley: local_mean ≈ valley price → day_mean wins (higher). + # - Overnight plateau max: local_mean ≈ plateau price → day_mean wins (lower). + # - Sharp isolated spike: local_mean already high → day_mean may be lower, + # but the spike still has large prominence either way. + day_mean = sum(smoothed) / len(smoothed) significant: list[dict[str, Any]] = [] for cand in candidates: idx = cand["idx"] @@ -303,11 +344,12 @@ def _find_significant_extrema( lo = max(0, idx - hw) hi = min(n, idx + hw + 1) neighbourhood = smoothed[lo:hi] + local_mean = sum(neighbourhood) / len(neighbourhood) if cand["type"] == "min": - reference = sum(neighbourhood) / len(neighbourhood) + reference = max(local_mean, day_mean) # broad valley: day_mean dominates prominence = reference - cand["price"] else: - reference = sum(neighbourhood) / len(neighbourhood) + reference = min(local_mean, day_mean) # plateau max: day_mean dominates prominence = cand["price"] - reference if prominence >= min_amplitude * 0.8: # slight tolerance on the threshold significant.append(cand) @@ -348,14 +390,18 @@ def _classify_pattern( extrema: list[dict[str, Any]], cv_pct: float, times: list[datetime], + start_price: float = 0.0, + end_price: float = 0.0, ) -> tuple[str, float]: """ Classify the day into a pattern string and confidence score (0-1). Args: - extrema: List of significant extrema (already deduplicated). - cv_pct: Coefficient of variation for the day (%). - times: Timestamps of all intervals (for position calculations). + extrema: List of significant extrema (already deduplicated). + cv_pct: Coefficient of variation for the day (%). + times: Timestamps of all intervals (for position calculations). + start_price: Smoothed price of the first interval (day start). + end_price: Smoothed price of the last interval (day end). Returns: (pattern_string, confidence_float) @@ -401,8 +447,18 @@ def _classify_pattern( # ── two extrema ───────────────────────────────────────────────────────────── if n_extrema == 2: if types == ["max", "min"]: + # Check if max is above both endpoints → genuine interior peak + max_price = extrema[0]["price"] + if start_price > 0 and end_price > 0 and max_price > start_price and max_price > end_price: + return DAY_PATTERN_PEAK, 0.65 return DAY_PATTERN_FALLING, 0.7 if types == ["min", "max"]: + # Check if min is below both endpoints → genuine interior valley + # (avoids misclassifying as RISING a day that starts/ends expensive + # but has a cheap midday zone, e.g. spring solar duck-curve). + min_price = extrema[0]["price"] + if start_price > 0 and end_price > 0 and min_price < start_price and min_price < end_price: + return DAY_PATTERN_VALLEY, 0.65 return DAY_PATTERN_RISING, 0.7 if types == ["min", "min"]: return DAY_PATTERN_DOUBLE_VALLEY, 0.65 diff --git a/custom_components/tibber_prices/coordinator/period_handlers/level_filtering.py b/custom_components/tibber_prices/coordinator/period_handlers/level_filtering.py index 19a582d..7a2fe93 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/level_filtering.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/level_filtering.py @@ -282,8 +282,10 @@ def compute_geometric_flex_bonus( zone_start = day_pattern.get("peak_start") zone_end = day_pattern.get("peak_end") else: - # Best price: expand inside VALLEY (V/U-shape) zone - if pattern != "valley": + # Best price: expand inside VALLEY zone. + # Also handles DOUBLE_PEAK (solar duck-curve: expensive morning/evening, cheap midday) + # where valley_start/valley_end mark the knee points around the midday minimum. + if pattern not in ("valley", "double_peak"): return 0.0 zone_start = day_pattern.get("valley_start") zone_end = day_pattern.get("valley_end") diff --git a/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py b/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py index 1b5be4d..eeb1eec 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py @@ -284,6 +284,7 @@ def _try_min_duration_fallback( existing_periods: list[dict], prices_by_day: dict[date, list[dict]], time: TibberPricesTimeService, + day_patterns_by_date: dict | None = None, ) -> tuple[dict[str, Any] | None, dict[str, Any]]: """ Try reducing min_period_length to find periods when relaxation is exhausted. @@ -303,6 +304,8 @@ def _try_min_duration_fallback( existing_periods: Periods found so far (from relaxation) prices_by_day: Price intervals grouped by day time: Time service instance + day_patterns_by_date: Optional dict mapping date → day pattern dict. Used for + geometric flex bonus in period detection. Returns: Tuple of (result dict with periods, metadata dict) or (None, empty metadata) @@ -362,6 +365,8 @@ def _try_min_duration_fallback( threshold_volatility_very_high=config.threshold_volatility_very_high, level_filter=None, # Disable level filter gap_count=config.gap_count, + extend_to_extreme=config.extend_to_extreme, + max_extension_intervals=config.max_extension_intervals, ) # Try to find periods for days with zero periods @@ -375,6 +380,7 @@ def _try_min_duration_fallback( day_prices, config=fallback_config, time=time, + day_patterns_by_date=day_patterns_by_date, ) day_periods = day_result.get("periods", []) @@ -813,6 +819,7 @@ def calculate_periods_with_relaxation( existing_periods=all_periods, prices_by_day=prices_by_day, time=time, + day_patterns_by_date=day_patterns_by_date, ) if fallback_result: diff --git a/custom_components/tibber_prices/coordinator/period_handlers/shape_extension.py b/custom_components/tibber_prices/coordinator/period_handlers/shape_extension.py index 9389401..2d294b1 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/shape_extension.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/shape_extension.py @@ -27,6 +27,7 @@ from typing import TYPE_CHECKING, Any from custom_components.tibber_prices.const import ( PRICE_LEVEL_CHEAP, PRICE_LEVEL_EXPENSIVE, + PRICE_LEVEL_MAPPING, PRICE_LEVEL_VERY_CHEAP, PRICE_LEVEL_VERY_EXPENSIVE, ) @@ -161,6 +162,67 @@ def _walk_contiguous( return additions +def _fallback_blocked_by_majority( + intervals: list[dict[str, Any]], + primary_level: str, + fallback_level: str, +) -> bool: + """Return ``True`` when fallback extension should be suppressed. + + If *primary_level* intervals strictly outnumber *fallback_level* intervals + in the existing period, the period's character is predominantly primary. + Extending with *fallback_level* would dilute that character; the geometric + flex bonus of the core algorithm provides a better boundary in that case. + + Args: + intervals: Existing period interval list. + primary_level: Preferred level (``VERY_CHEAP`` / ``VERY_EXPENSIVE``). + fallback_level: Extension candidate level (``CHEAP`` / ``EXPENSIVE``). + + Returns: + ``True`` if fallback extension should be blocked. + + """ + primary_count = sum(1 for iv in intervals if iv.get("level") == primary_level) + fallback_count = sum(1 for iv in intervals if iv.get("level") == fallback_level) + return primary_count > fallback_count + + +def _is_spike_adjacent( + beyond_iv: dict[str, Any] | None, + fallback_level: str, + reverse_sort: bool, +) -> bool: + """Return ``True`` when the interval just outside the extension is a spike. + + If the interval immediately beyond the last collected fallback extension is + "worse" than *fallback_level* (more expensive for best-price, cheaper for + peak-price), the extension intervals form a ramp leading into a spike and + should be discarded. + + Args: + beyond_iv: Interval dict just outside the collected extension, or ``None``. + fallback_level: The level used for the fallback extension. + reverse_sort: ``True`` for peak-price, ``False`` for best-price. + + Returns: + ``True`` if the extension should be dropped. + + """ + if beyond_iv is None: + return False + beyond_level = beyond_iv.get("level") + if beyond_level is None: + return False + fallback_value = PRICE_LEVEL_MAPPING.get(fallback_level, 0) + beyond_value = PRICE_LEVEL_MAPPING.get(beyond_level, 0) + if reverse_sort: + # Peak: "worse" means cheaper than the extension level + return beyond_value < fallback_value + # Best: "worse" means more expensive than the extension level + return beyond_value > fallback_value + + def _extend_period_edges( period: dict[str, Any], interval_index: dict[datetime, dict[str, Any]], @@ -200,28 +262,55 @@ def _extend_period_edges( # ``end`` is the exclusive boundary: the last included interval starts at # ``end - _INTERVAL_DURATION``. + reverse_sort = primary_level == PRICE_LEVEL_VERY_EXPENSIVE backward_step = -_INTERVAL_DURATION forward_step = _INTERVAL_DURATION + # Collect original intervals early – needed for the majority gate below. + original_intervals = _collect_original_intervals(start, end, interval_index) + # ── walk LEFT (earlier than period start) ───────────────────────────────── left_cursor = start - _INTERVAL_DURATION left_additions = _walk_contiguous(interval_index, left_cursor, backward_step, primary_level, max_intervals) + left_used_fallback = False if not left_additions: - # Fallback: no primary-level neighbours on this side → try fallback level - left_additions = _walk_contiguous(interval_index, left_cursor, backward_step, fallback_level, max_intervals) + # Fallback: only if the period interior is not predominantly primary_level. + # When primary_level (e.g. VERY_CHEAP) strictly outnumbers fallback_level + # (e.g. CHEAP) inside the period, adding fallback edges dilutes the + # period's character. Rely on the geometric flex bonus instead. + if not _fallback_blocked_by_majority(original_intervals, primary_level, fallback_level): + left_additions = _walk_contiguous(interval_index, left_cursor, backward_step, fallback_level, max_intervals) + left_used_fallback = bool(left_additions) + + # Look-beyond guard (fallback only): if the interval immediately outside the + # collected extensions is worse than fallback_level (e.g. a price spike just + # before a run of CHEAP intervals), those intervals form a ramp into the spike + # and should not be included. + if left_used_fallback: + one_beyond_left = start - _INTERVAL_DURATION * (len(left_additions) + 1) + if _is_spike_adjacent(interval_index.get(one_beyond_left), fallback_level, reverse_sort): + left_additions = [] # ── walk RIGHT (later than period end) ──────────────────────────────────── right_additions = _walk_contiguous(interval_index, end, forward_step, primary_level, max_intervals) + right_used_fallback = False if not right_additions: - # Fallback: no primary-level neighbours on this side → try fallback level - right_additions = _walk_contiguous(interval_index, end, forward_step, fallback_level, max_intervals) + # Fallback: same majority gate as left side. + if not _fallback_blocked_by_majority(original_intervals, primary_level, fallback_level): + right_additions = _walk_contiguous(interval_index, end, forward_step, fallback_level, max_intervals) + right_used_fallback = bool(right_additions) + + # Look-beyond guard (fallback only). + if right_used_fallback: + one_beyond_right = end + _INTERVAL_DURATION * len(right_additions) + if _is_spike_adjacent(interval_index.get(one_beyond_right), fallback_level, reverse_sort): + right_additions = [] total_added = len(left_additions) + len(right_additions) if total_added == 0: return period # ── rebuild full interval list for the extended period ──────────────────── - original_intervals = _collect_original_intervals(start, end, interval_index) all_period_intervals = left_additions + original_intervals + right_additions # ── recalculate boundaries ──────────────────────────────────────────────── @@ -256,7 +345,6 @@ def _extend_period_edges( cv_pct = round(statistics.stdev(prices_for_vol) / mean_p * 100, 1) # ── assemble updated period dict (keep structural fields, update statistics) ─ - reverse_sort = primary_level == PRICE_LEVEL_VERY_EXPENSIVE updated: dict[str, Any] = { **period, # Time fields