diff --git a/custom_components/tibber_prices/coordinator/period_handlers/period_overlap.py b/custom_components/tibber_prices/coordinator/period_handlers/period_overlap.py index 079f62a..cecdc76 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/period_overlap.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/period_overlap.py @@ -17,6 +17,41 @@ INDENT_L1 = " " # Nested logic / loop iterations INDENT_L2 = " " # Deeper nesting +def _estimate_merged_cv(period1: dict, period2: dict) -> float | None: + """ + Estimate the CV of a merged period from two period summaries. + + Since we don't have the raw prices, we estimate using the combined min/max range. + This is a conservative estimate - the actual CV could be higher or lower. + + Formula: CV ≈ (range / 2) / mean * 100 + Where range = max - min, mean = (min + max) / 2 + + This approximation assumes roughly uniform distribution within the range. + """ + p1_min = period1.get("price_min") + p1_max = period1.get("price_max") + p2_min = period2.get("price_min") + p2_max = period2.get("price_max") + + if None in (p1_min, p1_max, p2_min, p2_max): + return None + + # Cast to float - None case handled above + combined_min = min(float(p1_min), float(p2_min)) # type: ignore[arg-type] + combined_max = max(float(p1_max), float(p2_max)) # type: ignore[arg-type] + + if combined_min <= 0: + return None + + combined_mean = (combined_min + combined_max) / 2 + price_range = combined_max - combined_min + + # CV estimate based on range (assuming uniform distribution) + # For uniform distribution: std_dev ≈ range / sqrt(12) ≈ range / 3.46 + return (price_range / 3.46) / combined_mean * 100 + + def recalculate_period_metadata(periods: list[dict], *, time: TibberPricesTimeService) -> None: """ Recalculate period metadata after merging periods. @@ -120,6 +155,119 @@ def merge_adjacent_periods(period1: dict, period2: dict) -> dict: return merged +def _check_merge_quality_gate(periods_to_merge: list[tuple[int, dict]], relaxed: dict) -> bool: + """ + Check if merging would create a period that's too heterogeneous. + + Returns True if merge is allowed, False if blocked by Quality Gate. + """ + from .types import PERIOD_MAX_CV # noqa: PLC0415 + + relaxed_start = relaxed["start"] + relaxed_end = relaxed["end"] + + for _idx, existing in periods_to_merge: + estimated_cv = _estimate_merged_cv(existing, relaxed) + if estimated_cv is not None and estimated_cv > PERIOD_MAX_CV: + _LOGGER.debug( + "Merge blocked by Quality Gate: %s-%s + %s-%s would have CV≈%.1f%% (max: %.1f%%)", + existing["start"].strftime("%H:%M"), + existing["end"].strftime("%H:%M"), + relaxed_start.strftime("%H:%M"), + relaxed_end.strftime("%H:%M"), + estimated_cv, + PERIOD_MAX_CV, + ) + return False + return True + + +def _would_swallow_existing(relaxed: dict, existing_periods: list[dict]) -> bool: + """ + Check if the relaxed period would "swallow" any existing period. + + A period is "swallowed" if the new relaxed period completely contains it. + In this case, we should NOT merge - the existing smaller period is more + homogeneous and should be preserved. + + This prevents relaxation from replacing good small periods with larger, + more heterogeneous ones. + + Returns: + True if any existing period would be swallowed (merge should be blocked) + False if safe to proceed with merge evaluation + + """ + relaxed_start = relaxed["start"] + relaxed_end = relaxed["end"] + + for existing in existing_periods: + existing_start = existing["start"] + existing_end = existing["end"] + + # Check if relaxed completely contains existing + if relaxed_start <= existing_start and relaxed_end >= existing_end: + _LOGGER.debug( + "Blocking merge: %s-%s would swallow %s-%s (keeping smaller period)", + relaxed_start.strftime("%H:%M"), + relaxed_end.strftime("%H:%M"), + existing_start.strftime("%H:%M"), + existing_end.strftime("%H:%M"), + ) + return True + + return False + + +def _is_duplicate_period(relaxed: dict, existing_periods: list[dict], tolerance_seconds: int = 60) -> bool: + """Check if relaxed period is a duplicate of any existing period.""" + relaxed_start = relaxed["start"] + relaxed_end = relaxed["end"] + + for existing in existing_periods: + if ( + abs((relaxed_start - existing["start"]).total_seconds()) < tolerance_seconds + and abs((relaxed_end - existing["end"]).total_seconds()) < tolerance_seconds + ): + _LOGGER_DETAILS.debug( + "%sSkipping duplicate period %s-%s (already exists)", + INDENT_L1, + relaxed_start.strftime("%H:%M"), + relaxed_end.strftime("%H:%M"), + ) + return True + return False + + +def _find_adjacent_or_overlapping(relaxed: dict, existing_periods: list[dict]) -> list[tuple[int, dict]]: + """Find all periods that are adjacent to or overlapping with the relaxed period.""" + relaxed_start = relaxed["start"] + relaxed_end = relaxed["end"] + periods_to_merge = [] + + for idx, existing in enumerate(existing_periods): + existing_start = existing["start"] + existing_end = existing["end"] + + # Check if adjacent (no gap) or overlapping + is_adjacent = relaxed_end == existing_start or relaxed_start == existing_end + is_overlapping = relaxed_start < existing_end and relaxed_end > existing_start + + if is_adjacent or is_overlapping: + periods_to_merge.append((idx, existing)) + _LOGGER_DETAILS.debug( + "%sPeriod %s-%s %s with existing period %s-%s", + INDENT_L1, + relaxed_start.strftime("%H:%M"), + relaxed_end.strftime("%H:%M"), + "overlaps" if is_overlapping else "is adjacent to", + existing_start.strftime("%H:%M"), + existing_end.strftime("%H:%M"), + ) + + return periods_to_merge + + def resolve_period_overlaps( existing_periods: list[dict], new_relaxed_periods: list[dict], @@ -130,6 +278,10 @@ def resolve_period_overlaps( Adjacent or overlapping periods are merged into single continuous periods. The newer period's relaxation attributes override the older period's. + Quality Gate: Merging is blocked if the combined period would have + an estimated CV above PERIOD_MAX_CV (25%), to prevent creating + periods with excessive internal price variation. + This function is called incrementally after each relaxation phase: - Phase 1: existing = baseline, new = first relaxation - Phase 2: existing = baseline + phase 1, new = second relaxation @@ -167,46 +319,16 @@ def resolve_period_overlaps( relaxed_end = relaxed["end"] # Check if this period is duplicate (exact match within tolerance) - tolerance_seconds = 60 # 1 minute tolerance - is_duplicate = False - for existing in merged: - if ( - abs((relaxed_start - existing["start"]).total_seconds()) < tolerance_seconds - and abs((relaxed_end - existing["end"]).total_seconds()) < tolerance_seconds - ): - is_duplicate = True - _LOGGER_DETAILS.debug( - "%sSkipping duplicate period %s-%s (already exists)", - INDENT_L1, - relaxed_start.strftime("%H:%M"), - relaxed_end.strftime("%H:%M"), - ) - break + if _is_duplicate_period(relaxed, merged): + continue - if is_duplicate: + # Check if this period would "swallow" an existing smaller period + # In that case, skip it - the smaller existing period is more homogeneous + if _would_swallow_existing(relaxed, merged): continue # Find periods that are adjacent or overlapping (should be merged) - periods_to_merge = [] - for idx, existing in enumerate(merged): - existing_start = existing["start"] - existing_end = existing["end"] - - # Check if adjacent (no gap) or overlapping - is_adjacent = relaxed_end == existing_start or relaxed_start == existing_end - is_overlapping = relaxed_start < existing_end and relaxed_end > existing_start - - if is_adjacent or is_overlapping: - periods_to_merge.append((idx, existing)) - _LOGGER_DETAILS.debug( - "%sPeriod %s-%s %s with existing period %s-%s", - INDENT_L1, - relaxed_start.strftime("%H:%M"), - relaxed_end.strftime("%H:%M"), - "overlaps" if is_overlapping else "is adjacent to", - existing_start.strftime("%H:%M"), - existing_end.strftime("%H:%M"), - ) + periods_to_merge = _find_adjacent_or_overlapping(relaxed, merged) if not periods_to_merge: # No merge needed - add as new period @@ -218,23 +340,39 @@ def resolve_period_overlaps( relaxed_start.strftime("%H:%M"), relaxed_end.strftime("%H:%M"), ) - else: - # Merge with all adjacent/overlapping periods - # Start with the new relaxed period - merged_period = relaxed.copy() + continue - # Remove old periods (in reverse order to maintain indices) - for idx, existing in reversed(periods_to_merge): - merged_period = merge_adjacent_periods(existing, merged_period) - merged.pop(idx) + # Quality Gate: Check if merging would create a period that's too heterogeneous + should_merge = _check_merge_quality_gate(periods_to_merge, relaxed) - # Add the merged result - merged.append(merged_period) + if not should_merge: + # Don't merge - add as separate period instead + merged.append(relaxed) + periods_added += 1 + _LOGGER_DETAILS.debug( + "%sAdded new period %s-%s separately (merge blocked by CV gate)", + INDENT_L1, + relaxed_start.strftime("%H:%M"), + relaxed_end.strftime("%H:%M"), + ) + continue - # Count as added if we merged exactly one existing period - # (means we extended/merged, not replaced multiple) - if len(periods_to_merge) == 1: - periods_added += 1 + # Merge with all adjacent/overlapping periods + # Start with the new relaxed period + merged_period = relaxed.copy() + + # Remove old periods (in reverse order to maintain indices) + for idx, existing in reversed(periods_to_merge): + merged_period = merge_adjacent_periods(existing, merged_period) + merged.pop(idx) + + # Add the merged result + merged.append(merged_period) + + # Count as added if we merged exactly one existing period + # (means we extended/merged, not replaced multiple) + if len(periods_to_merge) == 1: + periods_added += 1 # Sort all periods by start time merged.sort(key=lambda p: p["start"]) diff --git a/custom_components/tibber_prices/coordinator/period_handlers/period_statistics.py b/custom_components/tibber_prices/coordinator/period_handlers/period_statistics.py index 50b5879..40304a3 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/period_statistics.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/period_statistics.py @@ -19,6 +19,7 @@ from custom_components.tibber_prices.utils.average import calculate_median from custom_components.tibber_prices.utils.price import ( aggregate_period_levels, aggregate_period_ratings, + calculate_coefficient_of_variation, calculate_volatility_level, ) @@ -170,6 +171,7 @@ def build_period_summary_dict( "price_max": stats.price_max, "price_spread": stats.price_spread, "volatility": stats.volatility, + "coefficient_of_variation": stats.coefficient_of_variation, # 4. Price differences will be added below if available # 5. Detail information (additional context) "period_interval_count": period_data.period_length, @@ -314,7 +316,10 @@ def extract_period_summaries( # Extract prices for volatility calculation (coefficient of variation) prices_for_volatility = [float(p["total"]) for p in period_price_data if "total" in p] - # Calculate volatility (categorical) and aggregated rating difference (numeric) + # Calculate CV (numeric) for quality gate checks + period_cv = calculate_coefficient_of_variation(prices_for_volatility) + + # Calculate volatility (categorical) using thresholds volatility = calculate_volatility_level( prices_for_volatility, threshold_moderate=thresholds.threshold_volatility_moderate, @@ -348,6 +353,7 @@ def extract_period_summaries( price_max=price_stats["price_max"], price_spread=price_stats["price_spread"], volatility=volatility, + coefficient_of_variation=round(period_cv, 1) if period_cv is not None else None, period_price_diff=period_price_diff, period_price_diff_pct=period_price_diff_pct, ) diff --git a/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py b/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py index 5572cac..a43fec3 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/relaxation.py @@ -11,7 +11,7 @@ if TYPE_CHECKING: from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService - from .types import TibberPricesPeriodConfig +from custom_components.tibber_prices.utils.price import calculate_coefficient_of_variation from .period_overlap import ( recalculate_period_metadata, @@ -21,6 +21,8 @@ from .types import ( INDENT_L0, INDENT_L1, INDENT_L2, + PERIOD_MAX_CV, + TibberPricesPeriodConfig, ) _LOGGER = logging.getLogger(__name__) @@ -32,6 +34,125 @@ FLEX_WARNING_THRESHOLD_RELAXATION = 0.25 # 25% - INFO: suggest lowering to 15-2 MAX_FLEX_HARD_LIMIT = 0.50 # 50% - hard maximum flex value FLEX_HIGH_THRESHOLD_RELAXATION = 0.30 # 30% - WARNING: base flex too high for relaxation mode +# Min duration fallback constants +# When all relaxation phases are exhausted and still no periods found, +# gradually reduce min_period_length to find at least something +MIN_DURATION_FALLBACK_MINIMUM = 30 # Minimum period length to try (30 min = 2 intervals) +MIN_DURATION_FALLBACK_STEP = 15 # Reduce by 15 min (1 interval) each step + + +def _check_period_quality( + period: dict, all_prices: list[dict], *, time: TibberPricesTimeService +) -> tuple[bool, float | None]: + """ + Check if a period passes the quality gate (internal CV not too high). + + The Quality Gate prevents relaxation from creating periods with too much + internal price variation. A "best price period" with prices ranging from + 0.5 to 1.0 kr/kWh is not useful - user can't trust it's actually "best". + + Args: + period: Period summary dict with "start" and "end" datetime + all_prices: All price intervals (to look up prices for CV calculation) + time: Time service for interval time parsing + + Returns: + Tuple of (passes_quality_gate, cv_value) + - passes_quality_gate: True if CV <= PERIOD_MAX_CV + - cv_value: Calculated CV as percentage, or None if not calculable + + """ + start_time = period.get("start") + end_time = period.get("end") + + if not start_time or not end_time: + return True, None # Can't check, assume OK + + # Build lookup for prices + price_lookup: dict[str, float] = {} + for price_data in all_prices: + interval_time = time.get_interval_time(price_data) + if interval_time: + price_lookup[interval_time.isoformat()] = float(price_data["total"]) + + # Collect prices within the period + period_prices: list[float] = [] + interval_duration = time.get_interval_duration() + + current = start_time + while current < end_time: + price = price_lookup.get(current.isoformat()) + if price is not None: + period_prices.append(price) + current = current + interval_duration + + # Need at least 2 prices to calculate CV (same as MIN_PRICES_FOR_VOLATILITY in price.py) + min_prices_for_cv = 2 + if len(period_prices) < min_prices_for_cv: + return True, None # Too few prices to calculate CV + + cv = calculate_coefficient_of_variation(period_prices) + if cv is None: + return True, None + + passes = cv <= PERIOD_MAX_CV + return passes, cv + + +def _count_quality_periods( + periods: list[dict], + all_prices: list[dict], + prices_by_day: dict[date, list[dict]], + min_periods: int, + *, + time: TibberPricesTimeService, +) -> tuple[int, int]: + """ + Count days meeting requirement when considering quality gate. + + Only periods passing the quality gate (CV <= PERIOD_MAX_CV) are counted + towards meeting the min_periods requirement. + + Args: + periods: List of all periods + all_prices: All price intervals + prices_by_day: Price intervals grouped by day + min_periods: Target periods per day + time: Time service + + Returns: + Tuple of (days_meeting_requirement, total_quality_periods) + + """ + periods_by_day = group_periods_by_day(periods) + days_meeting_requirement = 0 + total_quality_periods = 0 + + for day in sorted(prices_by_day.keys()): + day_periods = periods_by_day.get(day, []) + quality_count = 0 + + for period in day_periods: + passes, cv = _check_period_quality(period, all_prices, time=time) + if passes: + quality_count += 1 + else: + _LOGGER_DETAILS.debug( + "%s Day %s: Period %s-%s REJECTED by quality gate (CV=%.1f%% > %.1f%%)", + INDENT_L2, + day, + period.get("start", "?").strftime("%H:%M") if hasattr(period.get("start"), "strftime") else "?", + period.get("end", "?").strftime("%H:%M") if hasattr(period.get("end"), "strftime") else "?", + cv or 0, + PERIOD_MAX_CV, + ) + + total_quality_periods += quality_count + if quality_count >= min_periods: + days_meeting_requirement += 1 + + return days_meeting_requirement, total_quality_periods + def group_periods_by_day(periods: list[dict]) -> dict[date, list[dict]]: """ @@ -137,7 +258,167 @@ def group_prices_by_day(all_prices: list[dict], *, time: TibberPricesTimeService return prices_by_day -def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relaxation requires many parameters and statements +def _try_min_duration_fallback( + *, + config: TibberPricesPeriodConfig, + existing_periods: list[dict], + prices_by_day: dict[date, list[dict]], + time: TibberPricesTimeService, +) -> tuple[dict[str, Any] | None, dict[str, Any]]: + """ + Try reducing min_period_length to find periods when relaxation is exhausted. + + This is a LAST RESORT mechanism. It only activates when: + 1. All relaxation phases have been tried + 2. Some days STILL have zero periods (not just below min_periods) + + The fallback progressively reduces min_period_length: + - 60 min (default) → 45 min → 30 min (minimum) + + It does NOT reduce below 30 min (2 intervals) because a single 15-min + interval is essentially just the daily min/max price - not a "period". + + Args: + config: Period configuration + existing_periods: Periods found so far (from relaxation) + prices_by_day: Price intervals grouped by day + time: Time service instance + + Returns: + Tuple of (result dict with periods, metadata dict) or (None, empty metadata) + + """ + from .core import calculate_periods # noqa: PLC0415 - Avoid circular import + + metadata: dict[str, Any] = {"phases_used": [], "fallback_active": False} + + # Only try fallback if current min_period_length > minimum + if config.min_period_length <= MIN_DURATION_FALLBACK_MINIMUM: + return None, metadata + + # Check which days have ZERO periods (not just below target) + existing_by_day = group_periods_by_day(existing_periods) + days_with_zero_periods = [day for day in prices_by_day if not existing_by_day.get(day)] + + if not days_with_zero_periods: + _LOGGER_DETAILS.debug( + "%sMin duration fallback: All days have at least one period - no fallback needed", + INDENT_L1, + ) + return None, metadata + + _LOGGER.info( + "Min duration fallback: %d day(s) have zero periods, trying shorter min_period_length...", + len(days_with_zero_periods), + ) + + # Try progressively shorter min_period_length + current_min_duration = config.min_period_length + fallback_periods: list[dict] = [] + + while current_min_duration > MIN_DURATION_FALLBACK_MINIMUM: + current_min_duration = max( + current_min_duration - MIN_DURATION_FALLBACK_STEP, + MIN_DURATION_FALLBACK_MINIMUM, + ) + + _LOGGER_DETAILS.debug( + "%sTrying min_period_length=%d min for days with zero periods", + INDENT_L2, + current_min_duration, + ) + + # Create modified config with shorter min_period_length + # Use maxed-out flex (50%) since we're in fallback mode + fallback_config = TibberPricesPeriodConfig( + reverse_sort=config.reverse_sort, + flex=MAX_FLEX_HARD_LIMIT, # Max flex + min_distance_from_avg=0, # Disable min_distance in fallback + min_period_length=current_min_duration, + threshold_low=config.threshold_low, + threshold_high=config.threshold_high, + threshold_volatility_moderate=config.threshold_volatility_moderate, + threshold_volatility_high=config.threshold_volatility_high, + threshold_volatility_very_high=config.threshold_volatility_very_high, + level_filter=None, # Disable level filter + gap_count=config.gap_count, + ) + + # Try to find periods for days with zero periods + for day in days_with_zero_periods: + day_prices = prices_by_day.get(day, []) + if not day_prices: + continue + + try: + day_result = calculate_periods( + day_prices, + config=fallback_config, + time=time, + ) + + day_periods = day_result.get("periods", []) + if day_periods: + # Mark periods with fallback metadata + for period in day_periods: + period["duration_fallback_active"] = True + period["duration_fallback_min_length"] = current_min_duration + period["relaxation_active"] = True + period["relaxation_level"] = f"duration_fallback={current_min_duration}min" + + fallback_periods.extend(day_periods) + _LOGGER.info( + "Min duration fallback: Found %d period(s) for %s at min_length=%d min", + len(day_periods), + day, + current_min_duration, + ) + + except (KeyError, ValueError, TypeError) as err: + _LOGGER.warning( + "Error during min duration fallback for %s: %s", + day, + err, + ) + continue + + # If we found periods for all zero-period days, we can stop + if fallback_periods: + # Remove days that now have periods from the list + fallback_by_day = group_periods_by_day(fallback_periods) + days_with_zero_periods = [day for day in days_with_zero_periods if not fallback_by_day.get(day)] + + if not days_with_zero_periods: + break + + if fallback_periods: + # Merge with existing periods + # resolve_period_overlaps merges adjacent/overlapping periods + merged_periods, _new_count = resolve_period_overlaps( + existing_periods, + fallback_periods, + ) + recalculate_period_metadata(merged_periods, time=time) + + metadata["fallback_active"] = True + metadata["phases_used"] = [f"duration_fallback (min_length={current_min_duration}min)"] + + _LOGGER.info( + "Min duration fallback complete: Added %d period(s), total now %d", + len(fallback_periods), + len(merged_periods), + ) + + return {"periods": merged_periods}, metadata + + _LOGGER.warning( + "Min duration fallback: Still %d day(s) with zero periods after trying all durations", + len(days_with_zero_periods), + ) + return None, metadata + + +def calculate_periods_with_relaxation( # noqa: PLR0912, PLR0913, PLR0915 - Per-day relaxation requires many parameters and branches all_prices: list[dict], *, config: TibberPricesPeriodConfig, @@ -185,6 +466,9 @@ def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relax from .core import ( # noqa: PLC0415 calculate_periods, ) + from .period_building import ( # noqa: PLC0415 + filter_superseded_periods, + ) # Compact INFO-level summary period_type = "PEAK PRICE" if config.reverse_sort else "BEST PRICE" @@ -338,6 +622,37 @@ def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relax period_count = len(day_periods) if period_count >= min_periods: days_meeting_requirement += 1 + + # === MIN DURATION FALLBACK === + # If still no periods after relaxation, try reducing min_period_length + # This is a last resort to ensure users always get SOME period + if days_meeting_requirement < total_days and config.min_period_length > MIN_DURATION_FALLBACK_MINIMUM: + _LOGGER.info( + "Relaxation incomplete (%d/%d days). Trying min_duration fallback...", + days_meeting_requirement, + total_days, + ) + + fallback_result, fallback_metadata = _try_min_duration_fallback( + config=config, + existing_periods=all_periods, + prices_by_day=prices_by_day, + time=time, + ) + + if fallback_result: + all_periods = fallback_result["periods"] + all_phases_used.extend(fallback_metadata.get("phases_used", [])) + + # Recount after fallback + periods_by_day = group_periods_by_day(all_periods) + days_meeting_requirement = 0 + for day in sorted(prices_by_day.keys()): + day_periods = periods_by_day.get(day, []) + period_count = len(day_periods) + if period_count >= min_periods: + days_meeting_requirement += 1 + elif enable_relaxation: _LOGGER_DETAILS.debug( "%sAll %d days met target with baseline - no relaxation needed", @@ -351,6 +666,14 @@ def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relax # Recalculate metadata for combined periods recalculate_period_metadata(all_periods, time=time) + # Apply cross-day supersession filter (only for best-price periods) + # This removes late-night today periods that are superseded by better tomorrow alternatives + all_periods = filter_superseded_periods( + all_periods, + time=time, + reverse_sort=config.reverse_sort, + ) + # Build final result final_result = baseline_result.copy() final_result["periods"] = all_periods @@ -491,23 +814,11 @@ def relax_all_prices( # noqa: PLR0913 - Comprehensive filter relaxation require new_relaxed_periods=new_periods, ) - # Count periods per day to check if requirement met - periods_by_day = group_periods_by_day(combined) - days_meeting_requirement = 0 - - for day in sorted(prices_by_day.keys()): - day_periods = periods_by_day.get(day, []) - period_count = len(day_periods) - if period_count >= min_periods: - days_meeting_requirement += 1 - - _LOGGER_DETAILS.debug( - "%s Day %s: %d periods%s", - INDENT_L2, - day, - period_count, - " ✓" if period_count >= min_periods else f" (need {min_periods})", - ) + # Count periods per day with QUALITY GATE check + # Only periods with CV <= PERIOD_MAX_CV count towards min_periods requirement + days_meeting_requirement, quality_period_count = _count_quality_periods( + combined, all_prices, prices_by_day, min_periods, time=time + ) total_periods = len(combined) _LOGGER_DETAILS.debug( diff --git a/custom_components/tibber_prices/coordinator/period_handlers/types.py b/custom_components/tibber_prices/coordinator/period_handlers/types.py index 87dc4d9..142cd58 100644 --- a/custom_components/tibber_prices/coordinator/period_handlers/types.py +++ b/custom_components/tibber_prices/coordinator/period_handlers/types.py @@ -15,6 +15,24 @@ from custom_components.tibber_prices.const import ( DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, ) +# Quality Gate: Maximum coefficient of variation (CV) allowed within a period +# Periods with internal CV above this are considered too heterogeneous for "best price" +# A 25% CV means the std dev is 25% of the mean - beyond this, prices vary too much +# Example: Period with prices 0.7-0.99 kr has ~15% CV which is acceptable +# Period with prices 0.5-1.0 kr has ~30% CV which would be rejected +PERIOD_MAX_CV = 25.0 # 25% max coefficient of variation within a period + +# Cross-Day Extension: Time window constants +# When a period ends late in the day and tomorrow data is available, +# we can extend it past midnight if prices remain favorable +CROSS_DAY_LATE_PERIOD_START_HOUR = 20 # Consider periods starting at 20:00 or later for extension +CROSS_DAY_MAX_EXTENSION_HOUR = 8 # Don't extend beyond 08:00 next day (covers typical night low) + +# Cross-Day Supersession: When tomorrow data arrives, late-night periods that are +# worse than early-morning tomorrow periods become obsolete +# A today period is "superseded" if tomorrow has a significantly better alternative +SUPERSESSION_PRICE_IMPROVEMENT_PCT = 10.0 # Tomorrow must be at least 10% cheaper to supersede + # Log indentation levels for visual hierarchy INDENT_L0 = "" # Top level (calculate_periods_with_relaxation) INDENT_L1 = " " # Per-day loop @@ -62,6 +80,7 @@ class TibberPricesPeriodStatistics(NamedTuple): price_max: float price_spread: float volatility: str + coefficient_of_variation: float | None # CV as percentage (e.g., 15.0 for 15%) period_price_diff: float | None period_price_diff_pct: float | None