refactor(day_pattern): enhance pattern classification with price boundaries

Refactor the pattern classification logic to include start and end prices for better accuracy in identifying day patterns. This change improves the classification of price patterns, particularly for cases involving valleys and peaks.

Impact: Users will experience more accurate price pattern classifications, leading to better decision-making based on price trends.
This commit is contained in:
Julian Pawlowski 2026-04-17 14:02:02 +00:00
parent 2092d28ece
commit ba3e127ac7
4 changed files with 173 additions and 20 deletions

View file

@ -172,7 +172,13 @@ def _detect_single_day_pattern(
extrema = _find_significant_extrema(smoothed, min_amplitude=price_span * MIN_EXTREMUM_AMPLITUDE_RATIO) extrema = _find_significant_extrema(smoothed, min_amplitude=price_span * MIN_EXTREMUM_AMPLITUDE_RATIO)
# ── classify pattern ──────────────────────────────────────────────────────── # ── classify pattern ────────────────────────────────────────────────────────
pattern, confidence = _classify_pattern(extrema, cv_pct, times) pattern, confidence = _classify_pattern(
extrema,
cv_pct,
times,
start_price=smoothed[0],
end_price=smoothed[-1],
)
# ── knee points + primary extreme time ───────────────────────────────────── # ── knee points + primary extreme time ─────────────────────────────────────
extreme_time: datetime | None = None extreme_time: datetime | None = None
@ -208,6 +214,24 @@ def _detect_single_day_pattern(
if max_extrema: if max_extrema:
primary = max(max_extrema, key=lambda e: e["price"]) primary = max(max_extrema, key=lambda e: e["price"])
extreme_time = times[primary["idx"]] if primary["idx"] < len(times) else None extreme_time = times[primary["idx"]] if primary["idx"] < len(times) else None
# The valley between the two peaks is the cheap zone for best-price periods.
# Compute knee points around the deepest minimum so that compute_geometric_flex_bonus
# can apply extra flex to intervals in this zone (same mechanism as VALLEY).
min_extrema_dp = [e for e in extrema if e["type"] == "min"]
if min_extrema_dp:
valley_extreme = min(min_extrema_dp, key=lambda e: e["price"])
lk, rk = _find_knee_points(smoothed, valley_extreme["idx"])
valley_start = times[lk] if lk is not None and lk < len(times) else None
valley_end = times[rk] if rk is not None and rk < len(times) else None
# The valley between the two peaks is the cheap zone for best-price periods.
# Compute knee points around the deepest minimum so that compute_geometric_flex_bonus
# can apply extra flex to intervals inside this zone (same mechanism as VALLEY).
min_extrema_dp = [e for e in extrema if e["type"] == "min"]
if min_extrema_dp:
valley_extreme = min(min_extrema_dp, key=lambda e: e["price"])
lk, rk = _find_knee_points(smoothed, valley_extreme["idx"])
valley_start = times[lk] if lk is not None and lk < len(times) else None
valley_end = times[rk] if rk is not None and rk < len(times) else None
# ── intra-day segments ────────────────────────────────────────────────────── # ── intra-day segments ──────────────────────────────────────────────────────
segments = _detect_segments(extrema, prices_raw, times) segments = _detect_segments(extrema, prices_raw, times)
@ -278,24 +302,41 @@ def _find_significant_extrema(
return [] return []
# ── raw local extrema (strict local min/max) ──────────────────────────────── # ── raw local extrema (strict local min/max) ────────────────────────────────
# NOTE: We intentionally do NOT require the extremum to be below/above the
# day's start and end prices. That check was too restrictive for solar-
# influenced days (spring/summer) where overnight prices are as cheap as the
# midday valley, causing the midday dip to go undetected. The amplitude/
# prominence filter below is sufficient to suppress noise.
candidates: list[dict[str, Any]] = [] candidates: list[dict[str, Any]] = []
for i in range(1, n - 1): for i in range(1, n - 1):
prev_p = smoothed[i - 1] prev_p = smoothed[i - 1]
cur_p = smoothed[i] cur_p = smoothed[i]
next_p = smoothed[i + 1] next_p = smoothed[i + 1]
if cur_p <= prev_p and cur_p <= next_p and cur_p < smoothed[0] and cur_p < smoothed[-1]: if cur_p <= prev_p and cur_p <= next_p:
candidates.append({"idx": i, "type": "min", "price": cur_p}) candidates.append({"idx": i, "type": "min", "price": cur_p})
elif cur_p >= prev_p and cur_p >= next_p and cur_p > smoothed[0] and cur_p > smoothed[-1]: elif cur_p >= prev_p and cur_p >= next_p:
candidates.append({"idx": i, "type": "max", "price": cur_p}) candidates.append({"idx": i, "type": "max", "price": cur_p})
if not candidates: if not candidates:
return [] return []
# ── amplitude filter ──────────────────────────────────────────────────────── # ── amplitude filter ────────────────────────────────────────────────────────
# For each candidate, compute prominence = distance to the nearest extremum # For each candidate, measure prominence against the most representative
# of opposite type (or the global opposite extreme if none exist). # reference price available.
# We use a simpler heuristic: compare against the mean of its two flanking #
# values in the smoothed series (one window radius on each side). # Problem with pure local-neighbourhood mean: a broad, flat-bottomed valley
# (e.g. a 5-hour cheap midday zone) pulls the neighbourhood mean down toward
# the valley price itself, making the prominence appear near-zero even though
# the valley is clearly significant on the full day.
#
# Solution: use max(local_mean, day_mean) for minima and min(local_mean,
# day_mean) for maxima. This picks the reference that gives the LARGEST
# separation for genuine extrema:
# - Deep/broad valley: local_mean ≈ valley price → day_mean wins (higher).
# - Overnight plateau max: local_mean ≈ plateau price → day_mean wins (lower).
# - Sharp isolated spike: local_mean already high → day_mean may be lower,
# but the spike still has large prominence either way.
day_mean = sum(smoothed) / len(smoothed)
significant: list[dict[str, Any]] = [] significant: list[dict[str, Any]] = []
for cand in candidates: for cand in candidates:
idx = cand["idx"] idx = cand["idx"]
@ -303,11 +344,12 @@ def _find_significant_extrema(
lo = max(0, idx - hw) lo = max(0, idx - hw)
hi = min(n, idx + hw + 1) hi = min(n, idx + hw + 1)
neighbourhood = smoothed[lo:hi] neighbourhood = smoothed[lo:hi]
local_mean = sum(neighbourhood) / len(neighbourhood)
if cand["type"] == "min": if cand["type"] == "min":
reference = sum(neighbourhood) / len(neighbourhood) reference = max(local_mean, day_mean) # broad valley: day_mean dominates
prominence = reference - cand["price"] prominence = reference - cand["price"]
else: else:
reference = sum(neighbourhood) / len(neighbourhood) reference = min(local_mean, day_mean) # plateau max: day_mean dominates
prominence = cand["price"] - reference prominence = cand["price"] - reference
if prominence >= min_amplitude * 0.8: # slight tolerance on the threshold if prominence >= min_amplitude * 0.8: # slight tolerance on the threshold
significant.append(cand) significant.append(cand)
@ -348,6 +390,8 @@ def _classify_pattern(
extrema: list[dict[str, Any]], extrema: list[dict[str, Any]],
cv_pct: float, cv_pct: float,
times: list[datetime], times: list[datetime],
start_price: float = 0.0,
end_price: float = 0.0,
) -> tuple[str, float]: ) -> tuple[str, float]:
""" """
Classify the day into a pattern string and confidence score (0-1). Classify the day into a pattern string and confidence score (0-1).
@ -356,6 +400,8 @@ def _classify_pattern(
extrema: List of significant extrema (already deduplicated). extrema: List of significant extrema (already deduplicated).
cv_pct: Coefficient of variation for the day (%). cv_pct: Coefficient of variation for the day (%).
times: Timestamps of all intervals (for position calculations). times: Timestamps of all intervals (for position calculations).
start_price: Smoothed price of the first interval (day start).
end_price: Smoothed price of the last interval (day end).
Returns: Returns:
(pattern_string, confidence_float) (pattern_string, confidence_float)
@ -401,8 +447,18 @@ def _classify_pattern(
# ── two extrema ───────────────────────────────────────────────────────────── # ── two extrema ─────────────────────────────────────────────────────────────
if n_extrema == 2: if n_extrema == 2:
if types == ["max", "min"]: if types == ["max", "min"]:
# Check if max is above both endpoints → genuine interior peak
max_price = extrema[0]["price"]
if start_price > 0 and end_price > 0 and max_price > start_price and max_price > end_price:
return DAY_PATTERN_PEAK, 0.65
return DAY_PATTERN_FALLING, 0.7 return DAY_PATTERN_FALLING, 0.7
if types == ["min", "max"]: if types == ["min", "max"]:
# Check if min is below both endpoints → genuine interior valley
# (avoids misclassifying as RISING a day that starts/ends expensive
# but has a cheap midday zone, e.g. spring solar duck-curve).
min_price = extrema[0]["price"]
if start_price > 0 and end_price > 0 and min_price < start_price and min_price < end_price:
return DAY_PATTERN_VALLEY, 0.65
return DAY_PATTERN_RISING, 0.7 return DAY_PATTERN_RISING, 0.7
if types == ["min", "min"]: if types == ["min", "min"]:
return DAY_PATTERN_DOUBLE_VALLEY, 0.65 return DAY_PATTERN_DOUBLE_VALLEY, 0.65

View file

@ -282,8 +282,10 @@ def compute_geometric_flex_bonus(
zone_start = day_pattern.get("peak_start") zone_start = day_pattern.get("peak_start")
zone_end = day_pattern.get("peak_end") zone_end = day_pattern.get("peak_end")
else: else:
# Best price: expand inside VALLEY (V/U-shape) zone # Best price: expand inside VALLEY zone.
if pattern != "valley": # Also handles DOUBLE_PEAK (solar duck-curve: expensive morning/evening, cheap midday)
# where valley_start/valley_end mark the knee points around the midday minimum.
if pattern not in ("valley", "double_peak"):
return 0.0 return 0.0
zone_start = day_pattern.get("valley_start") zone_start = day_pattern.get("valley_start")
zone_end = day_pattern.get("valley_end") zone_end = day_pattern.get("valley_end")

View file

@ -284,6 +284,7 @@ def _try_min_duration_fallback(
existing_periods: list[dict], existing_periods: list[dict],
prices_by_day: dict[date, list[dict]], prices_by_day: dict[date, list[dict]],
time: TibberPricesTimeService, time: TibberPricesTimeService,
day_patterns_by_date: dict | None = None,
) -> tuple[dict[str, Any] | None, dict[str, Any]]: ) -> tuple[dict[str, Any] | None, dict[str, Any]]:
""" """
Try reducing min_period_length to find periods when relaxation is exhausted. Try reducing min_period_length to find periods when relaxation is exhausted.
@ -303,6 +304,8 @@ def _try_min_duration_fallback(
existing_periods: Periods found so far (from relaxation) existing_periods: Periods found so far (from relaxation)
prices_by_day: Price intervals grouped by day prices_by_day: Price intervals grouped by day
time: Time service instance time: Time service instance
day_patterns_by_date: Optional dict mapping date day pattern dict. Used for
geometric flex bonus in period detection.
Returns: Returns:
Tuple of (result dict with periods, metadata dict) or (None, empty metadata) Tuple of (result dict with periods, metadata dict) or (None, empty metadata)
@ -362,6 +365,8 @@ def _try_min_duration_fallback(
threshold_volatility_very_high=config.threshold_volatility_very_high, threshold_volatility_very_high=config.threshold_volatility_very_high,
level_filter=None, # Disable level filter level_filter=None, # Disable level filter
gap_count=config.gap_count, gap_count=config.gap_count,
extend_to_extreme=config.extend_to_extreme,
max_extension_intervals=config.max_extension_intervals,
) )
# Try to find periods for days with zero periods # Try to find periods for days with zero periods
@ -375,6 +380,7 @@ def _try_min_duration_fallback(
day_prices, day_prices,
config=fallback_config, config=fallback_config,
time=time, time=time,
day_patterns_by_date=day_patterns_by_date,
) )
day_periods = day_result.get("periods", []) day_periods = day_result.get("periods", [])
@ -813,6 +819,7 @@ def calculate_periods_with_relaxation(
existing_periods=all_periods, existing_periods=all_periods,
prices_by_day=prices_by_day, prices_by_day=prices_by_day,
time=time, time=time,
day_patterns_by_date=day_patterns_by_date,
) )
if fallback_result: if fallback_result:

View file

@ -27,6 +27,7 @@ from typing import TYPE_CHECKING, Any
from custom_components.tibber_prices.const import ( from custom_components.tibber_prices.const import (
PRICE_LEVEL_CHEAP, PRICE_LEVEL_CHEAP,
PRICE_LEVEL_EXPENSIVE, PRICE_LEVEL_EXPENSIVE,
PRICE_LEVEL_MAPPING,
PRICE_LEVEL_VERY_CHEAP, PRICE_LEVEL_VERY_CHEAP,
PRICE_LEVEL_VERY_EXPENSIVE, PRICE_LEVEL_VERY_EXPENSIVE,
) )
@ -161,6 +162,67 @@ def _walk_contiguous(
return additions return additions
def _fallback_blocked_by_majority(
intervals: list[dict[str, Any]],
primary_level: str,
fallback_level: str,
) -> bool:
"""Return ``True`` when fallback extension should be suppressed.
If *primary_level* intervals strictly outnumber *fallback_level* intervals
in the existing period, the period's character is predominantly primary.
Extending with *fallback_level* would dilute that character; the geometric
flex bonus of the core algorithm provides a better boundary in that case.
Args:
intervals: Existing period interval list.
primary_level: Preferred level (``VERY_CHEAP`` / ``VERY_EXPENSIVE``).
fallback_level: Extension candidate level (``CHEAP`` / ``EXPENSIVE``).
Returns:
``True`` if fallback extension should be blocked.
"""
primary_count = sum(1 for iv in intervals if iv.get("level") == primary_level)
fallback_count = sum(1 for iv in intervals if iv.get("level") == fallback_level)
return primary_count > fallback_count
def _is_spike_adjacent(
beyond_iv: dict[str, Any] | None,
fallback_level: str,
reverse_sort: bool,
) -> bool:
"""Return ``True`` when the interval just outside the extension is a spike.
If the interval immediately beyond the last collected fallback extension is
"worse" than *fallback_level* (more expensive for best-price, cheaper for
peak-price), the extension intervals form a ramp leading into a spike and
should be discarded.
Args:
beyond_iv: Interval dict just outside the collected extension, or ``None``.
fallback_level: The level used for the fallback extension.
reverse_sort: ``True`` for peak-price, ``False`` for best-price.
Returns:
``True`` if the extension should be dropped.
"""
if beyond_iv is None:
return False
beyond_level = beyond_iv.get("level")
if beyond_level is None:
return False
fallback_value = PRICE_LEVEL_MAPPING.get(fallback_level, 0)
beyond_value = PRICE_LEVEL_MAPPING.get(beyond_level, 0)
if reverse_sort:
# Peak: "worse" means cheaper than the extension level
return beyond_value < fallback_value
# Best: "worse" means more expensive than the extension level
return beyond_value > fallback_value
def _extend_period_edges( def _extend_period_edges(
period: dict[str, Any], period: dict[str, Any],
interval_index: dict[datetime, dict[str, Any]], interval_index: dict[datetime, dict[str, Any]],
@ -200,28 +262,55 @@ def _extend_period_edges(
# ``end`` is the exclusive boundary: the last included interval starts at # ``end`` is the exclusive boundary: the last included interval starts at
# ``end - _INTERVAL_DURATION``. # ``end - _INTERVAL_DURATION``.
reverse_sort = primary_level == PRICE_LEVEL_VERY_EXPENSIVE
backward_step = -_INTERVAL_DURATION backward_step = -_INTERVAL_DURATION
forward_step = _INTERVAL_DURATION forward_step = _INTERVAL_DURATION
# Collect original intervals early needed for the majority gate below.
original_intervals = _collect_original_intervals(start, end, interval_index)
# ── walk LEFT (earlier than period start) ───────────────────────────────── # ── walk LEFT (earlier than period start) ─────────────────────────────────
left_cursor = start - _INTERVAL_DURATION left_cursor = start - _INTERVAL_DURATION
left_additions = _walk_contiguous(interval_index, left_cursor, backward_step, primary_level, max_intervals) left_additions = _walk_contiguous(interval_index, left_cursor, backward_step, primary_level, max_intervals)
left_used_fallback = False
if not left_additions: if not left_additions:
# Fallback: no primary-level neighbours on this side → try fallback level # Fallback: only if the period interior is not predominantly primary_level.
# When primary_level (e.g. VERY_CHEAP) strictly outnumbers fallback_level
# (e.g. CHEAP) inside the period, adding fallback edges dilutes the
# period's character. Rely on the geometric flex bonus instead.
if not _fallback_blocked_by_majority(original_intervals, primary_level, fallback_level):
left_additions = _walk_contiguous(interval_index, left_cursor, backward_step, fallback_level, max_intervals) left_additions = _walk_contiguous(interval_index, left_cursor, backward_step, fallback_level, max_intervals)
left_used_fallback = bool(left_additions)
# Look-beyond guard (fallback only): if the interval immediately outside the
# collected extensions is worse than fallback_level (e.g. a price spike just
# before a run of CHEAP intervals), those intervals form a ramp into the spike
# and should not be included.
if left_used_fallback:
one_beyond_left = start - _INTERVAL_DURATION * (len(left_additions) + 1)
if _is_spike_adjacent(interval_index.get(one_beyond_left), fallback_level, reverse_sort):
left_additions = []
# ── walk RIGHT (later than period end) ──────────────────────────────────── # ── walk RIGHT (later than period end) ────────────────────────────────────
right_additions = _walk_contiguous(interval_index, end, forward_step, primary_level, max_intervals) right_additions = _walk_contiguous(interval_index, end, forward_step, primary_level, max_intervals)
right_used_fallback = False
if not right_additions: if not right_additions:
# Fallback: no primary-level neighbours on this side → try fallback level # Fallback: same majority gate as left side.
if not _fallback_blocked_by_majority(original_intervals, primary_level, fallback_level):
right_additions = _walk_contiguous(interval_index, end, forward_step, fallback_level, max_intervals) right_additions = _walk_contiguous(interval_index, end, forward_step, fallback_level, max_intervals)
right_used_fallback = bool(right_additions)
# Look-beyond guard (fallback only).
if right_used_fallback:
one_beyond_right = end + _INTERVAL_DURATION * len(right_additions)
if _is_spike_adjacent(interval_index.get(one_beyond_right), fallback_level, reverse_sort):
right_additions = []
total_added = len(left_additions) + len(right_additions) total_added = len(left_additions) + len(right_additions)
if total_added == 0: if total_added == 0:
return period return period
# ── rebuild full interval list for the extended period ──────────────────── # ── rebuild full interval list for the extended period ────────────────────
original_intervals = _collect_original_intervals(start, end, interval_index)
all_period_intervals = left_additions + original_intervals + right_additions all_period_intervals = left_additions + original_intervals + right_additions
# ── recalculate boundaries ──────────────────────────────────────────────── # ── recalculate boundaries ────────────────────────────────────────────────
@ -256,7 +345,6 @@ def _extend_period_edges(
cv_pct = round(statistics.stdev(prices_for_vol) / mean_p * 100, 1) cv_pct = round(statistics.stdev(prices_for_vol) / mean_p * 100, 1)
# ── assemble updated period dict (keep structural fields, update statistics) ─ # ── assemble updated period dict (keep structural fields, update statistics) ─
reverse_sort = primary_level == PRICE_LEVEL_VERY_EXPENSIVE
updated: dict[str, Any] = { updated: dict[str, Any] = {
**period, **period,
# Time fields # Time fields