feat(periods): handle flat days and absolute low-price scenarios

Three complementary fixes for pathological price days:

1. Adaptive min_periods for flat days (CV ≤ 10%):
   On days with nearly uniform prices (e.g. solar surplus), enforcing
   multiple distinct cheap periods is geometrically impossible.
   _compute_day_effective_min() detects CV ≤ LOW_CV_FLAT_DAY_THRESHOLD
   and reduces the effective target to 1 for that day (best price only;
   peak price always runs full relaxation).

2. min_distance scaling on absolute low-price days:
   When the daily average drops below 0.10 EUR (10 ct), percentage-based
   min_distance becomes unreliable. The threshold is scaled linearly to
   zero so the filter neither accepts the entire day nor blocks everything.

3. CV quality gate bypass for absolute low-price periods:
   Periods with a mean below 0.10 EUR may show high relative CV even
   though the absolute price differences are fractions of a cent.
   Both _check_period_quality() and _check_merge_quality_gate() now
   bypass the CV gate below this threshold.

Additionally: span-aware flex warnings now emit INFO/WARNING when
base_flex >= 25%/30% and at least one "normal" (non-V-shape) day
exists (FLEX_WARNING_VSHAPE_RATIO = 0.5). Previously the constants
were defined but never used.

Updated 3 test assertions in test_best_price_e2e.py: the flat-day
fixture (CV ~5.4%) correctly produces 1 period, not 2.

Impact: Best Price periods now appear reliably on V-shape solar days
and flat-price days. No more "0 periods" on days where the single
cheapest window is a valid and useful result.
This commit is contained in:
Julian Pawlowski 2026-04-06 12:18:40 +00:00
parent 2699a4658d
commit 1e1c8d5299
4 changed files with 214 additions and 9 deletions

View file

@ -25,6 +25,14 @@ INDENT_L0 = "" # Entry point / main function
FLEX_SCALING_THRESHOLD = 0.20 # 20% - start adjusting min_distance FLEX_SCALING_THRESHOLD = 0.20 # 20% - start adjusting min_distance
SCALE_FACTOR_WARNING_THRESHOLD = 0.8 # Log when reduction > 20% SCALE_FACTOR_WARNING_THRESHOLD = 0.8 # Log when reduction > 20%
# Low absolute price threshold for min_distance scaling (in major currency unit, e.g. EUR/NOK)
# When the daily average price is below this, percentage-based min_distance becomes unreliable:
# even the daily minimum may not fall far enough below average in relative terms.
# Scale min_distance linearly to 0 as avg_price approaches 0.
# Value: 0.10 EUR/NOK = 10 ct/øre.
# At avg ≥ 0.10: full min_distance. At avg = 0.05: 50% min_distance. At avg = 0: 0%.
LOW_PRICE_AVG_THRESHOLD = 0.10 # EUR/NOK major unit (= 10 ct/øre in subunit)
def check_level_with_gap_tolerance( def check_level_with_gap_tolerance(
interval_level: int, interval_level: int,
@ -203,6 +211,23 @@ def check_interval_criteria(
scale_factor, scale_factor,
) )
# ============================================================
# ABSOLUTE LOW-PRICE SCALING: Reduce min_distance when avg is very low
# ============================================================
# Problem: On days where the entire price level is extremely low (e.g., avg=3 ct),
# even the daily minimum might not fall 5% below the average in relative terms.
# Example: avg=3 ct, min_distance=5% → threshold=2.85 ct.
# If min=2.9 ct, no interval qualifies despite being genuinely cheap.
#
# Solution: Scale min_distance linearly to 0 as avg_price approaches 0.
# At avg=10 ct → full min_distance; at avg=5 ct → 50%; at avg=0 ct → 0%.
#
# This is currency-agnostic: 10 ct EUR and 10 øre NOK are both
# "very low price territory" for their respective markets.
if criteria.avg_price < LOW_PRICE_AVG_THRESHOLD and LOW_PRICE_AVG_THRESHOLD > 0:
low_price_scale = max(0.0, criteria.avg_price / LOW_PRICE_AVG_THRESHOLD)
adjusted_min_distance = adjusted_min_distance * low_price_scale
# Calculate threshold from average (using normalized positive distance) # Calculate threshold from average (using normalized positive distance)
# - Peak price: threshold = avg * (1 + distance/100) → prices must be ABOVE avg+distance # - Peak price: threshold = avg * (1 + distance/100) → prices must be ABOVE avg+distance
# - Best price: threshold = avg * (1 - distance/100) → prices must be BELOW avg-distance # - Best price: threshold = avg * (1 - distance/100) → prices must be BELOW avg-distance

View file

@ -161,12 +161,27 @@ def _check_merge_quality_gate(periods_to_merge: list[tuple[int, dict]], relaxed:
Returns True if merge is allowed, False if blocked by Quality Gate. Returns True if merge is allowed, False if blocked by Quality Gate.
""" """
from .relaxation import LOW_PRICE_QUALITY_BYPASS_THRESHOLD # noqa: PLC0415
from .types import PERIOD_MAX_CV # noqa: PLC0415 from .types import PERIOD_MAX_CV # noqa: PLC0415
relaxed_start = relaxed["start"] relaxed_start = relaxed["start"]
relaxed_end = relaxed["end"] relaxed_end = relaxed["end"]
for _idx, existing in periods_to_merge: for _idx, existing in periods_to_merge:
# Low absolute price bypass: if the combined price level is very cheap,
# the quality gate is bypassed (same logic as in _check_period_quality).
# Estimated combined mean = (combined_min + combined_max) / 2
r_min = relaxed.get("price_min")
r_max = relaxed.get("price_max")
e_min = existing.get("price_min")
e_max = existing.get("price_max")
if None not in (r_min, r_max, e_min, e_max):
combined_min = min(float(r_min), float(e_min)) # type: ignore[arg-type]
combined_max = max(float(r_max), float(e_max)) # type: ignore[arg-type]
combined_mean = (combined_min + combined_max) / 2
if combined_mean < LOW_PRICE_QUALITY_BYPASS_THRESHOLD:
continue # Very low absolute price → allow merge, check next pair
estimated_cv = _estimate_merged_cv(existing, relaxed) estimated_cv = _estimate_merged_cv(existing, relaxed)
if estimated_cv is not None and estimated_cv > PERIOD_MAX_CV: if estimated_cv is not None and estimated_cv > PERIOD_MAX_CV:
_LOGGER.debug( _LOGGER.debug(

View file

@ -40,6 +40,23 @@ FLEX_HIGH_THRESHOLD_RELAXATION = 0.30 # 30% - WARNING: base flex too high for r
MIN_DURATION_FALLBACK_MINIMUM = 30 # Minimum period length to try (30 min = 2 intervals) MIN_DURATION_FALLBACK_MINIMUM = 30 # Minimum period length to try (30 min = 2 intervals)
MIN_DURATION_FALLBACK_STEP = 15 # Reduce by 15 min (1 interval) each step MIN_DURATION_FALLBACK_STEP = 15 # Reduce by 15 min (1 interval) each step
# Low absolute price threshold for quality gate bypass (in major currency unit, e.g. EUR/NOK)
# When the MEAN price of a period is below this level, the CV quality gate is bypassed.
# Relative CV is unreliable at very low absolute prices: a range of 1-4 ct shows CV≈50%
# but is practically homogeneous from a cost perspective.
# Value: LOW_PRICE_AVG_THRESHOLD (subunit) / 100 = 10 ct / 100 = 0.10 EUR/NOK
LOW_PRICE_QUALITY_BYPASS_THRESHOLD = 0.10 # EUR/NOK major unit (= 10 ct/øre)
# Span-to-ref ratio threshold for suppressing flex warnings on V-shape days.
# When span / ref_price < this on ANY available day, the warning is shown.
# Below 0.5 means span < 50% of ref_price → "normal" day (high flex is genuinely risky).
# Above 0.5 means V-shape day (span-based formula already compensates → warning less relevant).
FLEX_WARNING_VSHAPE_RATIO = 0.5 # span/ref_price ratio below which a day is considered "normal"
# On flat price days (low variation), it is unrealistic to require multiple distinct
# best/peak price periods. Requiring 2+ periods would force relaxation to create
# artificial periods that don't represent genuine price structure.
LOW_CV_FLAT_DAY_THRESHOLD = 10.0 # %: days with CV ≤ this need only 1 period
def _check_period_quality( def _check_period_quality(
period: dict, all_prices: list[dict], *, time: TibberPricesTimeService period: dict, all_prices: list[dict], *, time: TibberPricesTimeService
@ -95,6 +112,16 @@ def _check_period_quality(
if cv is None: if cv is None:
return True, None return True, None
# Low absolute price bypass: when the MEAN period price is very cheap (below
# LOW_PRICE_QUALITY_BYPASS_THRESHOLD), relative CV becomes unreliable.
# Example: period at 1-4 ct (mean 2 ct) shows CV≈50% but is practically
# homogeneous from a cost perspective; the quality gate should not reject it.
# Using mean (not range) to distinguish genuinely cheap days from flat-priced
# normal days: a flat day at 33-36 ct has a small range BUT a high mean → no bypass.
period_mean = sum(period_prices) / len(period_prices)
if period_mean < LOW_PRICE_QUALITY_BYPASS_THRESHOLD:
return True, cv # Passes quality gate: absolute price level is very low
passes = cv <= PERIOD_MAX_CV passes = cv <= PERIOD_MAX_CV
return passes, cv return passes, cv
@ -418,6 +445,79 @@ def _try_min_duration_fallback(
return None, metadata return None, metadata
def _compute_day_effective_min(
prices_by_day: dict,
min_periods: int,
*,
enable_relaxation: bool,
reverse_sort: bool,
) -> tuple[dict, int]:
"""
Compute per-day effective min_periods with flat-day adaptation.
On days with very low price variation (CV LOW_CV_FLAT_DAY_THRESHOLD),
requiring multiple distinct cheapest/peak periods is unrealistic. Finding
ONE period is sufficient because there is no meaningful price structure that
would create natural multiple periods.
This applies ONLY to BEST PRICE periods (reverse_sort=False). For PEAK PRICE
periods, full relaxation should run even on flat days because identifying the
genuinely most expensive window requires the complete filter evaluation.
(Design decision: if the user explicitly disabled relaxation, honour the
configured min_periods exactly regardless.)
Args:
prices_by_day: Dict of date list of price dicts
min_periods: Configured minimum periods per day
enable_relaxation: Whether relaxation is enabled
reverse_sort: True for peak price (no adaptation), False for best price
Returns:
Tuple of (dict of date effective min_periods for that day, count of flat days detected)
"""
day_effective_min = {}
flat_day_count = 0
min_prices_for_cv = 2 # Need at least 2 prices to calculate CV
for day, day_prices in prices_by_day.items():
if not enable_relaxation or min_periods <= 1 or reverse_sort:
# Relaxation disabled, already 1, or peak price: no adaptation
day_effective_min[day] = min_periods
continue
price_values = [float(p["total"]) for p in day_prices if p.get("total") is not None]
if len(price_values) < min_prices_for_cv:
day_effective_min[day] = min_periods
continue
day_cv = calculate_coefficient_of_variation(price_values)
if day_cv is not None and day_cv <= LOW_CV_FLAT_DAY_THRESHOLD:
day_effective_min[day] = 1
flat_day_count += 1
_LOGGER_DETAILS.debug(
"%sDay %s: flat price profile (CV=%.1f%%%.1f%%) → min_periods relaxed to 1",
INDENT_L1,
day,
day_cv,
LOW_CV_FLAT_DAY_THRESHOLD,
)
else:
day_effective_min[day] = min_periods
if flat_day_count > 0:
_LOGGER.info(
"Adaptive min_periods: %d flat day(s) (CV ≤ %.0f%%) need only 1 period instead of %d",
flat_day_count,
LOW_CV_FLAT_DAY_THRESHOLD,
min_periods,
)
return day_effective_min, flat_day_count
def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-day relaxation requires many parameters and branches def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-day relaxation requires many parameters and branches
all_prices: list[dict], all_prices: list[dict],
*, *,
@ -549,6 +649,55 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-
prices_by_day = group_prices_by_day(all_prices, time=time) prices_by_day = group_prices_by_day(all_prices, time=time)
total_days = len(prices_by_day) total_days = len(prices_by_day)
# =========================================================================
# SPAN-AWARE FLEX WARNINGS
# =========================================================================
# With the span-based flex formula (flex_base = max(span, ref_price)), a high
# configured base_flex is less problematic on V-shape days (single low outlier)
# where span dominates. However, on normal days (span ≈ ref_price), high base_flex
# leaves little room for relaxation to escalate effectively.
#
# We only warn when relaxation is enabled AND the flex is high on typical days.
# V-shape days have span >> ref_price; normal days have span ≈ ref_price.
# We use the MEAN span/ref ratio across available days as a proxy:
# - Ratio near 1.0 → span ≈ ref (normal day, warning is relevant)
# - Ratio >> 1.0 → span >> ref (V-shape day, warning less relevant)
# Threshold: if any day has span/ref < 0.5 (i.e., span < 50% of ref_price),
# the spread is not extreme enough to suppress the warning.
if enable_relaxation and prices_by_day:
base_flex = abs(config.flex)
if base_flex >= FLEX_WARNING_THRESHOLD_RELAXATION:
# Check whether any available day is "normal enough" to make the warning relevant
any_normal_day = False
for day_prices in prices_by_day.values():
prices = [float(p["total"]) for p in day_prices if p.get("total") is not None]
if len(prices) >= 2: # noqa: PLR2004
day_min = min(prices)
day_avg = sum(prices) / len(prices)
span = abs(day_avg - day_min)
if day_min > 0 and span / day_min < FLEX_WARNING_VSHAPE_RATIO:
any_normal_day = True
break
if any_normal_day:
if base_flex >= FLEX_HIGH_THRESHOLD_RELAXATION:
_LOGGER.warning(
"Base flex %.0f%% is too high for relaxation mode. "
"Relaxation escalates in 3%% steps from your base - starting at %.0f%% "
"means only ~%.0f steps remain before the 50%% hard limit. "
"Recommendation: Use 15-20%% base flex with relaxation enabled.",
base_flex * 100,
base_flex * 100,
(MAX_FLEX_HARD_LIMIT - base_flex) / 0.03,
)
else:
_LOGGER.info(
"Base flex %.0f%% is high for relaxation mode (recommended: 15-20%%). "
"Only ~%.0f relaxation steps remain to 50%% hard limit.",
base_flex * 100,
(MAX_FLEX_HARD_LIMIT - base_flex) / 0.03,
)
_LOGGER.info( _LOGGER.info(
"Calculating baseline periods for %d days...", "Calculating baseline periods for %d days...",
total_days, total_days,
@ -567,21 +716,31 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-
# Count periods per day for min_periods check # Count periods per day for min_periods check
periods_by_day = group_periods_by_day(all_periods) periods_by_day = group_periods_by_day(all_periods)
# Pre-compute per-day effective min_periods (adaptive for flat days)
# On flat price days (low CV), 1 period is sufficient - enforcing min_periods > 1
# would only produce artificial additional periods of no practical value.
# NOTE: Only applied for best price (reverse_sort=False); peak price always uses
# full relaxation to properly identify the genuinely most expensive window.
day_effective_min, flat_days_count = _compute_day_effective_min(
prices_by_day, min_periods, enable_relaxation=enable_relaxation, reverse_sort=config.reverse_sort
)
days_meeting_requirement = 0 days_meeting_requirement = 0
for day in sorted(prices_by_day.keys()): for day in sorted(prices_by_day.keys()):
day_periods = periods_by_day.get(day, []) day_periods = periods_by_day.get(day, [])
period_count = len(day_periods) period_count = len(day_periods)
effective_min = day_effective_min.get(day, min_periods)
_LOGGER_DETAILS.debug( _LOGGER_DETAILS.debug(
"%sDay %s baseline: Found %d periods%s", "%sDay %s baseline: Found %d periods%s",
INDENT_L1, INDENT_L1,
day, day,
period_count, period_count,
f" (need {min_periods})" if enable_relaxation else "", f" (need {effective_min})" if enable_relaxation else "",
) )
if period_count >= min_periods: if period_count >= effective_min:
days_meeting_requirement += 1 days_meeting_requirement += 1
# Check if relaxation is needed # Check if relaxation is needed
@ -620,7 +779,7 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-
for day in sorted(prices_by_day.keys()): for day in sorted(prices_by_day.keys()):
day_periods = periods_by_day.get(day, []) day_periods = periods_by_day.get(day, [])
period_count = len(day_periods) period_count = len(day_periods)
if period_count >= min_periods: if period_count >= day_effective_min.get(day, min_periods):
days_meeting_requirement += 1 days_meeting_requirement += 1
# === MIN DURATION FALLBACK === # === MIN DURATION FALLBACK ===
@ -650,7 +809,7 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-
for day in sorted(prices_by_day.keys()): for day in sorted(prices_by_day.keys()):
day_periods = periods_by_day.get(day, []) day_periods = periods_by_day.get(day, [])
period_count = len(day_periods) period_count = len(day_periods)
if period_count >= min_periods: if period_count >= day_effective_min.get(day, min_periods):
days_meeting_requirement += 1 days_meeting_requirement += 1
elif enable_relaxation: elif enable_relaxation:
@ -692,6 +851,7 @@ def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-
"days_processed": total_days, "days_processed": total_days,
"days_meeting_requirement": days_meeting_requirement, "days_meeting_requirement": days_meeting_requirement,
"relaxation_incomplete": days_meeting_requirement < total_days, "relaxation_incomplete": days_meeting_requirement < total_days,
"flat_days_detected": flat_days_count, # Days where adaptive min_periods (CV-based) reduced target to 1
} }
return final_result return final_result

View file

@ -148,8 +148,9 @@ class TestBestPriceGenerationWorks:
periods = result.get("periods", []) periods = result.get("periods", [])
# Validation: periods found # Validation: periods found
assert len(periods) > 0, "Best periods should generate" # Note: test fixture is a flat day (CV≈5.4%). Adaptive min_periods correctly
assert 2 <= len(periods) <= 5, f"Expected 2-5 periods, got {len(periods)}" # returns 1 period instead of forcing a 2nd artificial period.
assert len(periods) >= 1, f"Best periods should generate, got {len(periods)}"
def test_positive_flex_produces_periods(self) -> None: def test_positive_flex_produces_periods(self) -> None:
""" """
@ -188,7 +189,8 @@ class TestBestPriceGenerationWorks:
periods_pos = result_pos.get("periods", []) periods_pos = result_pos.get("periods", [])
# With positive flex, should find periods # With positive flex, should find periods
assert len(periods_pos) >= 2, f"Should find periods with positive flex, got {len(periods_pos)}" # Note: flat day (CV≈5.4%) → adaptive min_periods returns 1 period (correct behavior).
assert len(periods_pos) >= 1, f"Should find periods with positive flex, got {len(periods_pos)}"
def test_periods_contain_low_prices(self) -> None: def test_periods_contain_low_prices(self) -> None:
""" """
@ -271,8 +273,11 @@ class TestBestPriceGenerationWorks:
periods = result.get("periods", []) periods = result.get("periods", [])
# Should find periods via relaxation # Should find at least 1 period.
assert len(periods) >= 2, "Relaxation should find periods" # Note: flat day (CV≈5.4%) → adaptive min_periods correctly needs only 1 period,
# which baseline already provides without relaxation. This validates that
# over-relaxation is skipped on truly flat days.
assert len(periods) >= 1, "Should find at least 1 period"
# Check if relaxation was used # Check if relaxation was used
relaxation_meta = result.get("metadata", {}).get("relaxation", {}) relaxation_meta = result.get("metadata", {}).get("relaxation", {})