feat(periods): add quality gates for period homogeneity

Prevent relaxation from creating heterogeneous periods:

1. CV-based Quality Gate (PERIOD_MAX_CV = 25%)
   - Periods with internal CV >25% are rejected during relaxation
   - CV field added to period statistics for transparency

2. Period Overlap Protection
   - New periods cannot "swallow" existing smaller periods
   - CV-based merge blocking prevents heterogeneous combinations
   - Preserves good baseline periods from relaxation replacement

3. Constants in types.py
   - PERIOD_MAX_CV, CROSS_DAY_*, SUPERSESSION_* thresholds
   - TibberPricesPeriodStatistics extended with coefficient_of_variation field

Impact: Users get smaller, more homogeneous periods that better represent
actual cheap/expensive windows.
This commit is contained in:
Julian Pawlowski 2025-12-22 23:21:51 +00:00
parent 7ee013daf2
commit 5ef0396c8b
4 changed files with 544 additions and 70 deletions

View file

@ -17,6 +17,41 @@ INDENT_L1 = " " # Nested logic / loop iterations
INDENT_L2 = " " # Deeper nesting INDENT_L2 = " " # Deeper nesting
def _estimate_merged_cv(period1: dict, period2: dict) -> float | None:
"""
Estimate the CV of a merged period from two period summaries.
Since we don't have the raw prices, we estimate using the combined min/max range.
This is a conservative estimate - the actual CV could be higher or lower.
Formula: CV (range / 2) / mean * 100
Where range = max - min, mean = (min + max) / 2
This approximation assumes roughly uniform distribution within the range.
"""
p1_min = period1.get("price_min")
p1_max = period1.get("price_max")
p2_min = period2.get("price_min")
p2_max = period2.get("price_max")
if None in (p1_min, p1_max, p2_min, p2_max):
return None
# Cast to float - None case handled above
combined_min = min(float(p1_min), float(p2_min)) # type: ignore[arg-type]
combined_max = max(float(p1_max), float(p2_max)) # type: ignore[arg-type]
if combined_min <= 0:
return None
combined_mean = (combined_min + combined_max) / 2
price_range = combined_max - combined_min
# CV estimate based on range (assuming uniform distribution)
# For uniform distribution: std_dev ≈ range / sqrt(12) ≈ range / 3.46
return (price_range / 3.46) / combined_mean * 100
def recalculate_period_metadata(periods: list[dict], *, time: TibberPricesTimeService) -> None: def recalculate_period_metadata(periods: list[dict], *, time: TibberPricesTimeService) -> None:
""" """
Recalculate period metadata after merging periods. Recalculate period metadata after merging periods.
@ -120,6 +155,119 @@ def merge_adjacent_periods(period1: dict, period2: dict) -> dict:
return merged return merged
def _check_merge_quality_gate(periods_to_merge: list[tuple[int, dict]], relaxed: dict) -> bool:
"""
Check if merging would create a period that's too heterogeneous.
Returns True if merge is allowed, False if blocked by Quality Gate.
"""
from .types import PERIOD_MAX_CV # noqa: PLC0415
relaxed_start = relaxed["start"]
relaxed_end = relaxed["end"]
for _idx, existing in periods_to_merge:
estimated_cv = _estimate_merged_cv(existing, relaxed)
if estimated_cv is not None and estimated_cv > PERIOD_MAX_CV:
_LOGGER.debug(
"Merge blocked by Quality Gate: %s-%s + %s-%s would have CV≈%.1f%% (max: %.1f%%)",
existing["start"].strftime("%H:%M"),
existing["end"].strftime("%H:%M"),
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
estimated_cv,
PERIOD_MAX_CV,
)
return False
return True
def _would_swallow_existing(relaxed: dict, existing_periods: list[dict]) -> bool:
"""
Check if the relaxed period would "swallow" any existing period.
A period is "swallowed" if the new relaxed period completely contains it.
In this case, we should NOT merge - the existing smaller period is more
homogeneous and should be preserved.
This prevents relaxation from replacing good small periods with larger,
more heterogeneous ones.
Returns:
True if any existing period would be swallowed (merge should be blocked)
False if safe to proceed with merge evaluation
"""
relaxed_start = relaxed["start"]
relaxed_end = relaxed["end"]
for existing in existing_periods:
existing_start = existing["start"]
existing_end = existing["end"]
# Check if relaxed completely contains existing
if relaxed_start <= existing_start and relaxed_end >= existing_end:
_LOGGER.debug(
"Blocking merge: %s-%s would swallow %s-%s (keeping smaller period)",
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
existing_start.strftime("%H:%M"),
existing_end.strftime("%H:%M"),
)
return True
return False
def _is_duplicate_period(relaxed: dict, existing_periods: list[dict], tolerance_seconds: int = 60) -> bool:
"""Check if relaxed period is a duplicate of any existing period."""
relaxed_start = relaxed["start"]
relaxed_end = relaxed["end"]
for existing in existing_periods:
if (
abs((relaxed_start - existing["start"]).total_seconds()) < tolerance_seconds
and abs((relaxed_end - existing["end"]).total_seconds()) < tolerance_seconds
):
_LOGGER_DETAILS.debug(
"%sSkipping duplicate period %s-%s (already exists)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
return True
return False
def _find_adjacent_or_overlapping(relaxed: dict, existing_periods: list[dict]) -> list[tuple[int, dict]]:
"""Find all periods that are adjacent to or overlapping with the relaxed period."""
relaxed_start = relaxed["start"]
relaxed_end = relaxed["end"]
periods_to_merge = []
for idx, existing in enumerate(existing_periods):
existing_start = existing["start"]
existing_end = existing["end"]
# Check if adjacent (no gap) or overlapping
is_adjacent = relaxed_end == existing_start or relaxed_start == existing_end
is_overlapping = relaxed_start < existing_end and relaxed_end > existing_start
if is_adjacent or is_overlapping:
periods_to_merge.append((idx, existing))
_LOGGER_DETAILS.debug(
"%sPeriod %s-%s %s with existing period %s-%s",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
"overlaps" if is_overlapping else "is adjacent to",
existing_start.strftime("%H:%M"),
existing_end.strftime("%H:%M"),
)
return periods_to_merge
def resolve_period_overlaps( def resolve_period_overlaps(
existing_periods: list[dict], existing_periods: list[dict],
new_relaxed_periods: list[dict], new_relaxed_periods: list[dict],
@ -130,6 +278,10 @@ def resolve_period_overlaps(
Adjacent or overlapping periods are merged into single continuous periods. Adjacent or overlapping periods are merged into single continuous periods.
The newer period's relaxation attributes override the older period's. The newer period's relaxation attributes override the older period's.
Quality Gate: Merging is blocked if the combined period would have
an estimated CV above PERIOD_MAX_CV (25%), to prevent creating
periods with excessive internal price variation.
This function is called incrementally after each relaxation phase: This function is called incrementally after each relaxation phase:
- Phase 1: existing = baseline, new = first relaxation - Phase 1: existing = baseline, new = first relaxation
- Phase 2: existing = baseline + phase 1, new = second relaxation - Phase 2: existing = baseline + phase 1, new = second relaxation
@ -167,46 +319,16 @@ def resolve_period_overlaps(
relaxed_end = relaxed["end"] relaxed_end = relaxed["end"]
# Check if this period is duplicate (exact match within tolerance) # Check if this period is duplicate (exact match within tolerance)
tolerance_seconds = 60 # 1 minute tolerance if _is_duplicate_period(relaxed, merged):
is_duplicate = False continue
for existing in merged:
if (
abs((relaxed_start - existing["start"]).total_seconds()) < tolerance_seconds
and abs((relaxed_end - existing["end"]).total_seconds()) < tolerance_seconds
):
is_duplicate = True
_LOGGER_DETAILS.debug(
"%sSkipping duplicate period %s-%s (already exists)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
break
if is_duplicate: # Check if this period would "swallow" an existing smaller period
# In that case, skip it - the smaller existing period is more homogeneous
if _would_swallow_existing(relaxed, merged):
continue continue
# Find periods that are adjacent or overlapping (should be merged) # Find periods that are adjacent or overlapping (should be merged)
periods_to_merge = [] periods_to_merge = _find_adjacent_or_overlapping(relaxed, merged)
for idx, existing in enumerate(merged):
existing_start = existing["start"]
existing_end = existing["end"]
# Check if adjacent (no gap) or overlapping
is_adjacent = relaxed_end == existing_start or relaxed_start == existing_end
is_overlapping = relaxed_start < existing_end and relaxed_end > existing_start
if is_adjacent or is_overlapping:
periods_to_merge.append((idx, existing))
_LOGGER_DETAILS.debug(
"%sPeriod %s-%s %s with existing period %s-%s",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
"overlaps" if is_overlapping else "is adjacent to",
existing_start.strftime("%H:%M"),
existing_end.strftime("%H:%M"),
)
if not periods_to_merge: if not periods_to_merge:
# No merge needed - add as new period # No merge needed - add as new period
@ -218,23 +340,39 @@ def resolve_period_overlaps(
relaxed_start.strftime("%H:%M"), relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"), relaxed_end.strftime("%H:%M"),
) )
else: continue
# Merge with all adjacent/overlapping periods
# Start with the new relaxed period
merged_period = relaxed.copy()
# Remove old periods (in reverse order to maintain indices) # Quality Gate: Check if merging would create a period that's too heterogeneous
for idx, existing in reversed(periods_to_merge): should_merge = _check_merge_quality_gate(periods_to_merge, relaxed)
merged_period = merge_adjacent_periods(existing, merged_period)
merged.pop(idx)
# Add the merged result if not should_merge:
merged.append(merged_period) # Don't merge - add as separate period instead
merged.append(relaxed)
periods_added += 1
_LOGGER_DETAILS.debug(
"%sAdded new period %s-%s separately (merge blocked by CV gate)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
continue
# Count as added if we merged exactly one existing period # Merge with all adjacent/overlapping periods
# (means we extended/merged, not replaced multiple) # Start with the new relaxed period
if len(periods_to_merge) == 1: merged_period = relaxed.copy()
periods_added += 1
# Remove old periods (in reverse order to maintain indices)
for idx, existing in reversed(periods_to_merge):
merged_period = merge_adjacent_periods(existing, merged_period)
merged.pop(idx)
# Add the merged result
merged.append(merged_period)
# Count as added if we merged exactly one existing period
# (means we extended/merged, not replaced multiple)
if len(periods_to_merge) == 1:
periods_added += 1
# Sort all periods by start time # Sort all periods by start time
merged.sort(key=lambda p: p["start"]) merged.sort(key=lambda p: p["start"])

View file

@ -19,6 +19,7 @@ from custom_components.tibber_prices.utils.average import calculate_median
from custom_components.tibber_prices.utils.price import ( from custom_components.tibber_prices.utils.price import (
aggregate_period_levels, aggregate_period_levels,
aggregate_period_ratings, aggregate_period_ratings,
calculate_coefficient_of_variation,
calculate_volatility_level, calculate_volatility_level,
) )
@ -170,6 +171,7 @@ def build_period_summary_dict(
"price_max": stats.price_max, "price_max": stats.price_max,
"price_spread": stats.price_spread, "price_spread": stats.price_spread,
"volatility": stats.volatility, "volatility": stats.volatility,
"coefficient_of_variation": stats.coefficient_of_variation,
# 4. Price differences will be added below if available # 4. Price differences will be added below if available
# 5. Detail information (additional context) # 5. Detail information (additional context)
"period_interval_count": period_data.period_length, "period_interval_count": period_data.period_length,
@ -314,7 +316,10 @@ def extract_period_summaries(
# Extract prices for volatility calculation (coefficient of variation) # Extract prices for volatility calculation (coefficient of variation)
prices_for_volatility = [float(p["total"]) for p in period_price_data if "total" in p] prices_for_volatility = [float(p["total"]) for p in period_price_data if "total" in p]
# Calculate volatility (categorical) and aggregated rating difference (numeric) # Calculate CV (numeric) for quality gate checks
period_cv = calculate_coefficient_of_variation(prices_for_volatility)
# Calculate volatility (categorical) using thresholds
volatility = calculate_volatility_level( volatility = calculate_volatility_level(
prices_for_volatility, prices_for_volatility,
threshold_moderate=thresholds.threshold_volatility_moderate, threshold_moderate=thresholds.threshold_volatility_moderate,
@ -348,6 +353,7 @@ def extract_period_summaries(
price_max=price_stats["price_max"], price_max=price_stats["price_max"],
price_spread=price_stats["price_spread"], price_spread=price_stats["price_spread"],
volatility=volatility, volatility=volatility,
coefficient_of_variation=round(period_cv, 1) if period_cv is not None else None,
period_price_diff=period_price_diff, period_price_diff=period_price_diff,
period_price_diff_pct=period_price_diff_pct, period_price_diff_pct=period_price_diff_pct,
) )

View file

@ -11,7 +11,7 @@ if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
from .types import TibberPricesPeriodConfig from custom_components.tibber_prices.utils.price import calculate_coefficient_of_variation
from .period_overlap import ( from .period_overlap import (
recalculate_period_metadata, recalculate_period_metadata,
@ -21,6 +21,8 @@ from .types import (
INDENT_L0, INDENT_L0,
INDENT_L1, INDENT_L1,
INDENT_L2, INDENT_L2,
PERIOD_MAX_CV,
TibberPricesPeriodConfig,
) )
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -32,6 +34,125 @@ FLEX_WARNING_THRESHOLD_RELAXATION = 0.25 # 25% - INFO: suggest lowering to 15-2
MAX_FLEX_HARD_LIMIT = 0.50 # 50% - hard maximum flex value MAX_FLEX_HARD_LIMIT = 0.50 # 50% - hard maximum flex value
FLEX_HIGH_THRESHOLD_RELAXATION = 0.30 # 30% - WARNING: base flex too high for relaxation mode FLEX_HIGH_THRESHOLD_RELAXATION = 0.30 # 30% - WARNING: base flex too high for relaxation mode
# Min duration fallback constants
# When all relaxation phases are exhausted and still no periods found,
# gradually reduce min_period_length to find at least something
MIN_DURATION_FALLBACK_MINIMUM = 30 # Minimum period length to try (30 min = 2 intervals)
MIN_DURATION_FALLBACK_STEP = 15 # Reduce by 15 min (1 interval) each step
def _check_period_quality(
period: dict, all_prices: list[dict], *, time: TibberPricesTimeService
) -> tuple[bool, float | None]:
"""
Check if a period passes the quality gate (internal CV not too high).
The Quality Gate prevents relaxation from creating periods with too much
internal price variation. A "best price period" with prices ranging from
0.5 to 1.0 kr/kWh is not useful - user can't trust it's actually "best".
Args:
period: Period summary dict with "start" and "end" datetime
all_prices: All price intervals (to look up prices for CV calculation)
time: Time service for interval time parsing
Returns:
Tuple of (passes_quality_gate, cv_value)
- passes_quality_gate: True if CV <= PERIOD_MAX_CV
- cv_value: Calculated CV as percentage, or None if not calculable
"""
start_time = period.get("start")
end_time = period.get("end")
if not start_time or not end_time:
return True, None # Can't check, assume OK
# Build lookup for prices
price_lookup: dict[str, float] = {}
for price_data in all_prices:
interval_time = time.get_interval_time(price_data)
if interval_time:
price_lookup[interval_time.isoformat()] = float(price_data["total"])
# Collect prices within the period
period_prices: list[float] = []
interval_duration = time.get_interval_duration()
current = start_time
while current < end_time:
price = price_lookup.get(current.isoformat())
if price is not None:
period_prices.append(price)
current = current + interval_duration
# Need at least 2 prices to calculate CV (same as MIN_PRICES_FOR_VOLATILITY in price.py)
min_prices_for_cv = 2
if len(period_prices) < min_prices_for_cv:
return True, None # Too few prices to calculate CV
cv = calculate_coefficient_of_variation(period_prices)
if cv is None:
return True, None
passes = cv <= PERIOD_MAX_CV
return passes, cv
def _count_quality_periods(
periods: list[dict],
all_prices: list[dict],
prices_by_day: dict[date, list[dict]],
min_periods: int,
*,
time: TibberPricesTimeService,
) -> tuple[int, int]:
"""
Count days meeting requirement when considering quality gate.
Only periods passing the quality gate (CV <= PERIOD_MAX_CV) are counted
towards meeting the min_periods requirement.
Args:
periods: List of all periods
all_prices: All price intervals
prices_by_day: Price intervals grouped by day
min_periods: Target periods per day
time: Time service
Returns:
Tuple of (days_meeting_requirement, total_quality_periods)
"""
periods_by_day = group_periods_by_day(periods)
days_meeting_requirement = 0
total_quality_periods = 0
for day in sorted(prices_by_day.keys()):
day_periods = periods_by_day.get(day, [])
quality_count = 0
for period in day_periods:
passes, cv = _check_period_quality(period, all_prices, time=time)
if passes:
quality_count += 1
else:
_LOGGER_DETAILS.debug(
"%s Day %s: Period %s-%s REJECTED by quality gate (CV=%.1f%% > %.1f%%)",
INDENT_L2,
day,
period.get("start", "?").strftime("%H:%M") if hasattr(period.get("start"), "strftime") else "?",
period.get("end", "?").strftime("%H:%M") if hasattr(period.get("end"), "strftime") else "?",
cv or 0,
PERIOD_MAX_CV,
)
total_quality_periods += quality_count
if quality_count >= min_periods:
days_meeting_requirement += 1
return days_meeting_requirement, total_quality_periods
def group_periods_by_day(periods: list[dict]) -> dict[date, list[dict]]: def group_periods_by_day(periods: list[dict]) -> dict[date, list[dict]]:
""" """
@ -137,7 +258,167 @@ def group_prices_by_day(all_prices: list[dict], *, time: TibberPricesTimeService
return prices_by_day return prices_by_day
def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relaxation requires many parameters and statements def _try_min_duration_fallback(
*,
config: TibberPricesPeriodConfig,
existing_periods: list[dict],
prices_by_day: dict[date, list[dict]],
time: TibberPricesTimeService,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
"""
Try reducing min_period_length to find periods when relaxation is exhausted.
This is a LAST RESORT mechanism. It only activates when:
1. All relaxation phases have been tried
2. Some days STILL have zero periods (not just below min_periods)
The fallback progressively reduces min_period_length:
- 60 min (default) 45 min 30 min (minimum)
It does NOT reduce below 30 min (2 intervals) because a single 15-min
interval is essentially just the daily min/max price - not a "period".
Args:
config: Period configuration
existing_periods: Periods found so far (from relaxation)
prices_by_day: Price intervals grouped by day
time: Time service instance
Returns:
Tuple of (result dict with periods, metadata dict) or (None, empty metadata)
"""
from .core import calculate_periods # noqa: PLC0415 - Avoid circular import
metadata: dict[str, Any] = {"phases_used": [], "fallback_active": False}
# Only try fallback if current min_period_length > minimum
if config.min_period_length <= MIN_DURATION_FALLBACK_MINIMUM:
return None, metadata
# Check which days have ZERO periods (not just below target)
existing_by_day = group_periods_by_day(existing_periods)
days_with_zero_periods = [day for day in prices_by_day if not existing_by_day.get(day)]
if not days_with_zero_periods:
_LOGGER_DETAILS.debug(
"%sMin duration fallback: All days have at least one period - no fallback needed",
INDENT_L1,
)
return None, metadata
_LOGGER.info(
"Min duration fallback: %d day(s) have zero periods, trying shorter min_period_length...",
len(days_with_zero_periods),
)
# Try progressively shorter min_period_length
current_min_duration = config.min_period_length
fallback_periods: list[dict] = []
while current_min_duration > MIN_DURATION_FALLBACK_MINIMUM:
current_min_duration = max(
current_min_duration - MIN_DURATION_FALLBACK_STEP,
MIN_DURATION_FALLBACK_MINIMUM,
)
_LOGGER_DETAILS.debug(
"%sTrying min_period_length=%d min for days with zero periods",
INDENT_L2,
current_min_duration,
)
# Create modified config with shorter min_period_length
# Use maxed-out flex (50%) since we're in fallback mode
fallback_config = TibberPricesPeriodConfig(
reverse_sort=config.reverse_sort,
flex=MAX_FLEX_HARD_LIMIT, # Max flex
min_distance_from_avg=0, # Disable min_distance in fallback
min_period_length=current_min_duration,
threshold_low=config.threshold_low,
threshold_high=config.threshold_high,
threshold_volatility_moderate=config.threshold_volatility_moderate,
threshold_volatility_high=config.threshold_volatility_high,
threshold_volatility_very_high=config.threshold_volatility_very_high,
level_filter=None, # Disable level filter
gap_count=config.gap_count,
)
# Try to find periods for days with zero periods
for day in days_with_zero_periods:
day_prices = prices_by_day.get(day, [])
if not day_prices:
continue
try:
day_result = calculate_periods(
day_prices,
config=fallback_config,
time=time,
)
day_periods = day_result.get("periods", [])
if day_periods:
# Mark periods with fallback metadata
for period in day_periods:
period["duration_fallback_active"] = True
period["duration_fallback_min_length"] = current_min_duration
period["relaxation_active"] = True
period["relaxation_level"] = f"duration_fallback={current_min_duration}min"
fallback_periods.extend(day_periods)
_LOGGER.info(
"Min duration fallback: Found %d period(s) for %s at min_length=%d min",
len(day_periods),
day,
current_min_duration,
)
except (KeyError, ValueError, TypeError) as err:
_LOGGER.warning(
"Error during min duration fallback for %s: %s",
day,
err,
)
continue
# If we found periods for all zero-period days, we can stop
if fallback_periods:
# Remove days that now have periods from the list
fallback_by_day = group_periods_by_day(fallback_periods)
days_with_zero_periods = [day for day in days_with_zero_periods if not fallback_by_day.get(day)]
if not days_with_zero_periods:
break
if fallback_periods:
# Merge with existing periods
# resolve_period_overlaps merges adjacent/overlapping periods
merged_periods, _new_count = resolve_period_overlaps(
existing_periods,
fallback_periods,
)
recalculate_period_metadata(merged_periods, time=time)
metadata["fallback_active"] = True
metadata["phases_used"] = [f"duration_fallback (min_length={current_min_duration}min)"]
_LOGGER.info(
"Min duration fallback complete: Added %d period(s), total now %d",
len(fallback_periods),
len(merged_periods),
)
return {"periods": merged_periods}, metadata
_LOGGER.warning(
"Min duration fallback: Still %d day(s) with zero periods after trying all durations",
len(days_with_zero_periods),
)
return None, metadata
def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-day relaxation requires many parameters and branches
all_prices: list[dict], all_prices: list[dict],
*, *,
config: TibberPricesPeriodConfig, config: TibberPricesPeriodConfig,
@ -185,6 +466,9 @@ def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relax
from .core import ( # noqa: PLC0415 from .core import ( # noqa: PLC0415
calculate_periods, calculate_periods,
) )
from .period_building import ( # noqa: PLC0415
filter_superseded_periods,
)
# Compact INFO-level summary # Compact INFO-level summary
period_type = "PEAK PRICE" if config.reverse_sort else "BEST PRICE" period_type = "PEAK PRICE" if config.reverse_sort else "BEST PRICE"
@ -338,6 +622,37 @@ def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relax
period_count = len(day_periods) period_count = len(day_periods)
if period_count >= min_periods: if period_count >= min_periods:
days_meeting_requirement += 1 days_meeting_requirement += 1
# === MIN DURATION FALLBACK ===
# If still no periods after relaxation, try reducing min_period_length
# This is a last resort to ensure users always get SOME period
if days_meeting_requirement < total_days and config.min_period_length > MIN_DURATION_FALLBACK_MINIMUM:
_LOGGER.info(
"Relaxation incomplete (%d/%d days). Trying min_duration fallback...",
days_meeting_requirement,
total_days,
)
fallback_result, fallback_metadata = _try_min_duration_fallback(
config=config,
existing_periods=all_periods,
prices_by_day=prices_by_day,
time=time,
)
if fallback_result:
all_periods = fallback_result["periods"]
all_phases_used.extend(fallback_metadata.get("phases_used", []))
# Recount after fallback
periods_by_day = group_periods_by_day(all_periods)
days_meeting_requirement = 0
for day in sorted(prices_by_day.keys()):
day_periods = periods_by_day.get(day, [])
period_count = len(day_periods)
if period_count >= min_periods:
days_meeting_requirement += 1
elif enable_relaxation: elif enable_relaxation:
_LOGGER_DETAILS.debug( _LOGGER_DETAILS.debug(
"%sAll %d days met target with baseline - no relaxation needed", "%sAll %d days met target with baseline - no relaxation needed",
@ -351,6 +666,14 @@ def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relax
# Recalculate metadata for combined periods # Recalculate metadata for combined periods
recalculate_period_metadata(all_periods, time=time) recalculate_period_metadata(all_periods, time=time)
# Apply cross-day supersession filter (only for best-price periods)
# This removes late-night today periods that are superseded by better tomorrow alternatives
all_periods = filter_superseded_periods(
all_periods,
time=time,
reverse_sort=config.reverse_sort,
)
# Build final result # Build final result
final_result = baseline_result.copy() final_result = baseline_result.copy()
final_result["periods"] = all_periods final_result["periods"] = all_periods
@ -491,23 +814,11 @@ def relax_all_prices( # noqa: PLR0913 - Comprehensive filter relaxation require
new_relaxed_periods=new_periods, new_relaxed_periods=new_periods,
) )
# Count periods per day to check if requirement met # Count periods per day with QUALITY GATE check
periods_by_day = group_periods_by_day(combined) # Only periods with CV <= PERIOD_MAX_CV count towards min_periods requirement
days_meeting_requirement = 0 days_meeting_requirement, quality_period_count = _count_quality_periods(
combined, all_prices, prices_by_day, min_periods, time=time
for day in sorted(prices_by_day.keys()): )
day_periods = periods_by_day.get(day, [])
period_count = len(day_periods)
if period_count >= min_periods:
days_meeting_requirement += 1
_LOGGER_DETAILS.debug(
"%s Day %s: %d periods%s",
INDENT_L2,
day,
period_count,
"" if period_count >= min_periods else f" (need {min_periods})",
)
total_periods = len(combined) total_periods = len(combined)
_LOGGER_DETAILS.debug( _LOGGER_DETAILS.debug(

View file

@ -15,6 +15,24 @@ from custom_components.tibber_prices.const import (
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
) )
# Quality Gate: Maximum coefficient of variation (CV) allowed within a period
# Periods with internal CV above this are considered too heterogeneous for "best price"
# A 25% CV means the std dev is 25% of the mean - beyond this, prices vary too much
# Example: Period with prices 0.7-0.99 kr has ~15% CV which is acceptable
# Period with prices 0.5-1.0 kr has ~30% CV which would be rejected
PERIOD_MAX_CV = 25.0 # 25% max coefficient of variation within a period
# Cross-Day Extension: Time window constants
# When a period ends late in the day and tomorrow data is available,
# we can extend it past midnight if prices remain favorable
CROSS_DAY_LATE_PERIOD_START_HOUR = 20 # Consider periods starting at 20:00 or later for extension
CROSS_DAY_MAX_EXTENSION_HOUR = 8 # Don't extend beyond 08:00 next day (covers typical night low)
# Cross-Day Supersession: When tomorrow data arrives, late-night periods that are
# worse than early-morning tomorrow periods become obsolete
# A today period is "superseded" if tomorrow has a significantly better alternative
SUPERSESSION_PRICE_IMPROVEMENT_PCT = 10.0 # Tomorrow must be at least 10% cheaper to supersede
# Log indentation levels for visual hierarchy # Log indentation levels for visual hierarchy
INDENT_L0 = "" # Top level (calculate_periods_with_relaxation) INDENT_L0 = "" # Top level (calculate_periods_with_relaxation)
INDENT_L1 = " " # Per-day loop INDENT_L1 = " " # Per-day loop
@ -62,6 +80,7 @@ class TibberPricesPeriodStatistics(NamedTuple):
price_max: float price_max: float
price_spread: float price_spread: float
volatility: str volatility: str
coefficient_of_variation: float | None # CV as percentage (e.g., 15.0 for 15%)
period_price_diff: float | None period_price_diff: float | None
period_price_diff_pct: float | None period_price_diff_pct: float | None