feat(periods): implement geometric_extension_attempted flag and time_range filtering

Phase 3: When geometric bonus intervals cause CV gate failure, strip them
from period edges (unextended boundaries) and set geometric_extension_attempted=True
on the summary. Previously only geometric_extension_active was tracked.
Moved LOW_PRICE_QUALITY_BYPASS_THRESHOLD constant to types.py for shared access.

Phase 4: Add time_range: tuple[datetime, datetime] | None parameter to
build_periods(), calculate_periods(), and calculate_periods_with_relaxation().
Filters candidate intervals to [start, end) without affecting day-wide reference
prices. Refactored _apply_segment_forcing() to use time_range instead of the
restricted_prices list approach.

Impact: Period statistics now accurately reflect when geometric flex extension
was attempted but reverted due to quality gate failure. Segment forcing uses
a cleaner API that preserves full price context for reference calculations.
This commit is contained in:
Julian Pawlowski 2026-04-12 08:24:25 +00:00
parent 4ddd19b132
commit 796eb4b422
5 changed files with 297 additions and 14 deletions

View file

@ -5,6 +5,8 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from datetime import datetime
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
from .types import TibberPricesPeriodConfig
@ -31,6 +33,7 @@ from .types import TibberPricesThresholdConfig
# Flex limits to prevent degenerate behavior (see docs/development/period-calculation-theory.md)
MAX_SAFE_FLEX = 0.50 # 50% - hard cap: above this, period detection becomes unreliable
MAX_OUTLIER_FLEX = 0.25 # 25% - cap for outlier filtering: above this, spike detection too permissive
MIN_SEGMENT_FORCING_INTERVALS = 8 # Minimum intervals per day half to attempt segment forcing (< 2 hours is too few)
def calculate_periods(
@ -39,6 +42,7 @@ def calculate_periods(
config: TibberPricesPeriodConfig,
time: TibberPricesTimeService,
day_patterns_by_date: dict | None = None,
time_range: tuple[datetime, datetime] | None = None,
) -> dict[str, Any]:
"""
Calculate price periods (best or peak) from price data.
@ -61,6 +65,9 @@ def calculate_periods(
min_period_length, threshold_low, and threshold_high.
time: TibberPricesTimeService instance (required).
day_patterns_by_date: Optional dict mapping date day pattern dict for geometric flex bonus.
time_range: Optional (start_inclusive, end_exclusive) window passed through to
build_periods(). When set, only intervals within [start, end) are considered
as period candidates. Used by Phase 4 segment forcing.
Returns:
Dict with:
@ -168,6 +175,7 @@ def calculate_periods(
level_filter=config.level_filter,
gap_count=config.gap_count,
time=time,
time_range=time_range,
)
_LOGGER.debug(
@ -178,6 +186,24 @@ def calculate_periods(
config.level_filter or "None",
)
# Step 3.5: Segment forcing for W/M-shaped days (opt-in, default disabled)
# For days detected as W-shape (DOUBLE_VALLEY for best) or M-shape (DOUBLE_PEAK for peak),
# ensures each price valley/peak segment has at least segment_min_periods periods.
if config.segment_forcing and day_patterns_by_date:
raw_periods = _apply_segment_forcing(
all_prices_smoothed,
raw_periods,
price_context,
config,
day_patterns_by_date=day_patterns_by_date,
time=time,
)
_LOGGER.debug(
"%sAfter segment_forcing: %d periods total",
INDENT_L0,
len(raw_periods),
)
# Step 4: Filter by minimum length
raw_periods = filter_periods_by_min_length(raw_periods, min_period_length, time=time)
_LOGGER.debug(
@ -264,3 +290,168 @@ def calculate_periods(
"avg_prices": {k.isoformat(): v for k, v in avg_price_by_day.items()},
},
}
# ─── Segment forcing helpers ──────────────────────────────────────────────────
def _period_belongs_to_side(
period: list[dict],
side_times: set,
time: "TibberPricesTimeService",
) -> bool:
"""Return True if the majority of a period's intervals are in side_times."""
if not period:
return False
in_side = sum(1 for iv in period if time.get_interval_time(iv) in side_times)
return in_side * 2 >= len(period)
def _apply_segment_forcing( # noqa: PLR0913
all_prices_smoothed: list[dict],
periods: list[list[dict]],
price_context: dict[str, Any],
config: "TibberPricesPeriodConfig",
*,
day_patterns_by_date: dict,
time: "TibberPricesTimeService",
) -> list[list[dict]]:
"""
Force at least segment_min_periods periods per segment for W/M-shaped days.
For DOUBLE_VALLEY days (best price): splits at the central price peak and
ensures each valley side has the required number of periods.
For DOUBLE_PEAK days (peak price): splits at the central price valley and
ensures each peak side has the required number of periods.
Args:
all_prices_smoothed: Outlier-filtered prices used for period building.
periods: Already-found periods from the global build_periods call.
price_context: Context dict with reference/average prices + filter settings.
config: Period configuration including segment_forcing parameters.
day_patterns_by_date: Detected day patterns keyed by date.
time: TibberPricesTimeService instance.
Returns:
Updated periods list with any new segment-forced periods appended.
"""
import logging # noqa: PLC0415
from .period_building import build_periods # noqa: PLC0415
from .types import DAY_PATTERN_DOUBLE_PEAK, DAY_PATTERN_DOUBLE_VALLEY, INDENT_L1, INDENT_L2 # noqa: PLC0415
_LOGGER = logging.getLogger(__name__) # noqa: N806
reverse_sort = config.reverse_sort
target_pattern = DAY_PATTERN_DOUBLE_PEAK if reverse_sort else DAY_PATTERN_DOUBLE_VALLEY
segment_min_periods = config.segment_min_periods
merged_periods = list(periods)
for day_date, day_pattern in day_patterns_by_date.items():
if day_pattern is None or day_pattern.get("pattern") != target_pattern:
continue
# Collect and sort this day's intervals
day_intervals = sorted(
(
iv
for iv in all_prices_smoothed
if (t := time.get_interval_time(iv)) is not None and t.date() == day_date
),
key=time.get_interval_time, # type: ignore[arg-type]
)
if len(day_intervals) < MIN_SEGMENT_FORCING_INTERVALS: # need at least a few intervals per segment
continue
# Find the central extremum in the middle 50% of the day
# DOUBLE_VALLEY → central peak = highest price between the two valleys
# DOUBLE_PEAK → central valley = lowest price between the two peaks
n = len(day_intervals)
middle = day_intervals[n // 4 : 3 * n // 4]
if not middle:
continue
if not reverse_sort:
split_iv = max(middle, key=lambda iv: iv.get("total") or 0)
else:
split_iv = min(middle, key=lambda iv: iv.get("total") or float("inf"))
split_time = time.get_interval_time(split_iv)
if split_time is None:
continue
side_a = [iv for iv in day_intervals if (t := time.get_interval_time(iv)) is not None and t <= split_time]
side_b = [iv for iv in day_intervals if (t := time.get_interval_time(iv)) is not None and t > split_time]
_LOGGER.debug(
"%sSegment forcing %s (%s): split at %s (%d+%d intervals)",
INDENT_L1,
day_date,
target_pattern,
split_time.strftime("%H:%M"),
len(side_a),
len(side_b),
)
for side_name, side_intervals in (("A", side_a), ("B", side_b)):
side_times = {time.get_interval_time(iv) for iv in side_intervals}
count_in_side = sum(1 for p in merged_periods if _period_belongs_to_side(p, side_times, time))
_LOGGER.debug(
"%sSide %s: %d existing periods (need %d)",
INDENT_L2,
side_name,
count_in_side,
segment_min_periods,
)
if count_in_side >= segment_min_periods:
continue
# Run period detection restricted to this segment side via time_range.
# The full all_prices_smoothed (including other days) is passed so that
# reference price context remains day-wide; time_range restricts which
# intervals are EVALUATED as period candidates to this side only.
sorted_side = sorted(side_intervals, key=time.get_interval_time) # type: ignore[arg-type]
side_start = time.get_interval_time(sorted_side[0])
# end = one interval duration past the last interval's start
side_end = time.get_interval_time(sorted_side[-1])
if side_start is None or side_end is None:
continue
side_end = side_end + time.get_interval_duration()
new_raw = build_periods(
all_prices_smoothed,
price_context,
reverse_sort=reverse_sort,
level_filter=config.level_filter,
gap_count=config.gap_count,
time=time,
time_range=(side_start, side_end),
)
# Add non-duplicate periods; flag them with segment_forced=True
added = 0
for new_period in new_raw:
new_times = {time.get_interval_time(iv) for iv in new_period if time.get_interval_time(iv) is not None}
is_dup = any(
bool(
new_times
& {time.get_interval_time(iv) for iv in existing if time.get_interval_time(iv) is not None}
)
for existing in merged_periods
)
if not is_dup:
merged_periods.append([{**iv, "segment_forced": True} for iv in new_period])
added += 1
_LOGGER.debug(
"%sSide %s: added %d forced periods (%d candidates from restricted run)",
INDENT_L2,
side_name,
added,
len(new_raw),
)
return merged_periods

View file

@ -62,6 +62,7 @@ def build_periods( # noqa: PLR0913, PLR0915, PLR0912 - Complex period building
level_filter: str | None = None,
gap_count: int = 0,
time: TibberPricesTimeService,
time_range: tuple[datetime, datetime] | None = None,
) -> list[list[dict]]:
"""
Build periods, allowing periods to cross midnight (day boundary).
@ -78,6 +79,10 @@ def build_periods( # noqa: PLR0913, PLR0915, PLR0912 - Complex period building
level_filter: Level filter string ("cheap", "expensive", "any", None)
gap_count: Number of allowed consecutive intervals deviating by exactly 1 level step
time: TibberPricesTimeService instance (required)
time_range: Optional (start_inclusive, end_exclusive) window. When set, only intervals
within [start, end) are considered as period candidates. Reference prices
(from price_context) remain day-wide and are unaffected by this filter.
Used by Phase 4 segment forcing to restrict detection to one segment side.
"""
ref_prices = price_context["ref_prices"]
@ -132,6 +137,11 @@ def build_periods( # noqa: PLR0913, PLR0915, PLR0912 - Complex period building
starts_at = time.get_interval_time(price_data)
if starts_at is None:
continue
# Filter by time range if specified (Phase 4 segment forcing)
if time_range is not None and not (time_range[0] <= starts_at < time_range[1]):
continue
date_key = starts_at.date()
# Use smoothed price for criteria checks (flex/distance)

View file

@ -23,6 +23,8 @@ from custom_components.tibber_prices.utils.price import (
calculate_volatility_level,
)
from .types import LOW_PRICE_QUALITY_BYPASS_THRESHOLD, PERIOD_MAX_CV
def calculate_period_price_diff(
price_mean: float,
@ -220,18 +222,57 @@ def build_period_summary_dict(
return summary
def _add_interval_flag_counts(summary: dict, period: list[dict]) -> None:
"""Add optional interval flag counts to period summary."""
def _strip_geo_from_edges(period: list[dict]) -> list[dict]:
"""
Remove geo-bonus intervals from leading and trailing edges of a period.
Used by Phase 3 CV gate: when a period with geometric extension fails the CV quality
gate, the edge intervals that were included only via geo-bonus flex are stripped to
restore the period's unextended (tighter) boundaries.
Geo-bonus intervals in the MIDDLE of a period are preserved (they represent
intervals genuinely inside the valley/peak zone, not boundary extensions).
Returns an empty list only when all intervals are geo-bonus (degenerate case).
"""
start = 0
end = len(period)
while start < end and period[start].get("geometric_bonus_applied", False):
start += 1
while end > start and period[end - 1].get("geometric_bonus_applied", False):
end -= 1
return period[start:end]
def _add_interval_flag_counts(summary: dict, period: list[dict], *, geo_extension_status: str | None = None) -> None:
"""
Add optional interval flag counts to period summary.
Args:
summary: Period summary dict to augment in-place.
period: Raw interval list (may already be stripped of geo-bonus edges).
geo_extension_status: "active" if geometric extension passed the CV gate,
"attempted" if it was tried but CV gate failed and period was reverted.
"""
if (count := sum(1 for i in period if i.get("smoothing_was_impactful", False))) > 0:
summary["period_interval_smoothed_count"] = count
if (count := sum(1 for i in period if i.get("is_level_gap", False))) > 0:
summary["period_interval_level_gap_count"] = count
if (count := sum(1 for i in period if i.get("geometric_bonus_applied", False))) > 0:
# Geometric extension: distinguish "active" (CV passed) from "attempted" (CV failed → reverted)
if geo_extension_status == "active":
count = sum(1 for i in period if i.get("geometric_bonus_applied", False))
summary["geometric_extension_active"] = True
summary["geometric_extension_intervals"] = count
elif geo_extension_status == "attempted":
# CV gate failed: geo extension was tried but period was reverted to base boundaries.
# The summary uses unextended (stripped) boundaries; this flag marks the attempt.
summary["geometric_extension_attempted"] = True
if any(i.get("segment_forced", False) for i in period):
summary["segment_forced"] = True
def extract_period_summaries(
def extract_period_summaries( # noqa: PLR0912, PLR0915 - CV pre-check for geo-extension adds necessary branches/statements
periods: list[list[dict]],
all_prices: list[dict],
price_context: dict[str, Any],
@ -280,6 +321,34 @@ def extract_period_summaries(
if not period:
continue
# Phase 3: Geometric extension CV gate check
# If this period contains geo-bonus intervals, pre-check whether the full period
# passes the CV quality gate. If it fails, revert to base boundaries by stripping
# geo-bonus intervals from the edges and mark with geometric_extension_attempted.
geo_extension_status: str | None = None
if any(iv.get("geometric_bonus_applied", False) for iv in period):
full_prices: list[float] = []
for iv in period:
start_iv = iv.get("interval_start")
if start_iv:
p = price_lookup.get(start_iv.isoformat())
if p:
full_prices.append(float(p["total"]))
if full_prices:
full_cv = calculate_coefficient_of_variation(full_prices)
cv_fails = (
full_cv is not None
and sum(full_prices) / len(full_prices) >= LOW_PRICE_QUALITY_BYPASS_THRESHOLD
and full_cv > PERIOD_MAX_CV
)
if cv_fails:
base_period = _strip_geo_from_edges(period)
if base_period:
period = base_period # noqa: PLW2901 - intentional period replacement
geo_extension_status = "attempted"
else:
geo_extension_status = "active"
first_interval = period[0]
last_interval = period[-1]
@ -369,7 +438,7 @@ def extract_period_summaries(
)
# Add optional interval flag counts (smoothing, level gaps, geometric extension)
_add_interval_flag_counts(summary, period)
_add_interval_flag_counts(summary, period, geo_extension_status=geo_extension_status)
summaries.append(summary)

View file

@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from collections.abc import Callable
from datetime import date
from datetime import date, datetime
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
@ -22,6 +22,7 @@ from .types import (
INDENT_L0,
INDENT_L1,
INDENT_L2,
LOW_PRICE_QUALITY_BYPASS_THRESHOLD,
PERIOD_MAX_CV,
TibberPricesPeriodConfig,
)
@ -41,12 +42,6 @@ FLEX_HIGH_THRESHOLD_RELAXATION = 0.30 # 30% - WARNING: base flex too high for r
MIN_DURATION_FALLBACK_MINIMUM = 30 # Minimum period length to try (30 min = 2 intervals)
MIN_DURATION_FALLBACK_STEP = 15 # Reduce by 15 min (1 interval) each step
# Low absolute price threshold for quality gate bypass (in major currency unit, e.g. EUR/NOK)
# When the MEAN price of a period is below this level, the CV quality gate is bypassed.
# Relative CV is unreliable at very low absolute prices: a range of 1-4 ct shows CV≈50%
# but is practically homogeneous from a cost perspective.
# Value: LOW_PRICE_AVG_THRESHOLD (subunit) / 100 = 10 ct / 100 = 0.10 EUR/NOK
LOW_PRICE_QUALITY_BYPASS_THRESHOLD = 0.10 # EUR/NOK major unit (= 10 ct/øre)
# Span-to-ref ratio threshold for suppressing flex warnings on V-shape days.
# When span / ref_price < this on ANY available day, the warning is shown.
@ -527,6 +522,7 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-
time: TibberPricesTimeService,
config_entry: Any, # ConfigEntry type
day_patterns_by_date: dict | None = None,
time_range: tuple[datetime, datetime] | None = None,
) -> dict[str, Any]:
"""
Calculate periods with optional per-day filter relaxation.
@ -555,6 +551,9 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-
config_entry: Config entry to get display unit configuration.
day_patterns_by_date: Optional dict mapping date day pattern dict. Used for
geometric flex bonus in period detection. Passed through to calculate_periods().
time_range: Optional (start_inclusive, end_exclusive) datetime window. When set,
only intervals within [start, end) are considered as period candidates.
Passed through to calculate_periods(). Used by Phase 4 segment forcing.
Returns:
Dict with same format as calculate_periods() output:
@ -712,7 +711,9 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-
# === BASELINE CALCULATION (process ALL prices together, including yesterday) ===
# Periods that ended before yesterday will be filtered out later by filter_periods_by_end_date()
# This keeps yesterday/today/tomorrow periods in the cache
baseline_result = calculate_periods(all_prices, config=config, time=time, day_patterns_by_date=day_patterns_by_date)
baseline_result = calculate_periods(
all_prices, config=config, time=time, day_patterns_by_date=day_patterns_by_date, time_range=time_range
)
all_periods = baseline_result["periods"]
# Count periods per day for min_periods check
@ -955,7 +956,10 @@ def relax_all_prices( # noqa: PLR0913 - Comprehensive filter relaxation require
# Process ALL prices together (allows midnight crossing)
result = calculate_periods(
all_prices, config=relaxed_config, time=time, day_patterns_by_date=day_patterns_by_date
all_prices,
config=relaxed_config,
time=time,
day_patterns_by_date=day_patterns_by_date,
)
new_periods = result["periods"]

View file

@ -22,6 +22,13 @@ from custom_components.tibber_prices.const import (
# Period with prices 0.5-1.0 kr has ~30% CV which would be rejected
PERIOD_MAX_CV = 25.0 # 25% max coefficient of variation within a period
# Low absolute price threshold for quality gate bypass (in major currency unit, e.g. EUR/NOK)
# When the MEAN price of a period is below this level, the CV quality gate is bypassed.
# Relative CV is unreliable at very low absolute prices: a range of 1-4 ct shows CV≈50%
# but is practically homogeneous from a cost perspective.
# Value: 10 ct / 100 = 0.10 EUR/NOK
LOW_PRICE_QUALITY_BYPASS_THRESHOLD = 0.10 # EUR/NOK major unit (= 10 ct/øre)
# Cross-Day Extension: Time window constants
# When a period ends late in the day and tomorrow data is available,
# we can extend it past midnight if prices remain favorable
@ -59,6 +66,8 @@ class TibberPricesPeriodConfig(NamedTuple):
extend_to_extreme: bool = False # Extend periods into adjacent VERY_CHEAP/VERY_EXPENSIVE intervals
max_extension_intervals: int = 0 # Max intervals this extension may add per side (0 = disabled)
geometric_extra_flex: float = 0.0 # Extra flex (decimal) for intervals inside the valley/peak zone (0.0 = disabled)
segment_forcing: bool = False # Force at least segment_min_periods in each W/M-shape segment
segment_min_periods: int = 1 # Minimum periods required per segment when segment_forcing is True
class TibberPricesPeriodData(NamedTuple):