hass.tibber_prices/custom_components/tibber_prices/period_utils/period_building.py
Julian Pawlowski 23b8bd1c62 feat(periods): modularize period_utils and add statistical outlier filtering
Refactored monolithic period_utils.py (1800 lines) into focused modules
for better maintainability and added advanced outlier filtering with
smart impact tracking.

Modular structure:
- types.py: Type definitions and constants (89 lines)
- level_filtering.py: Level filtering with gap tolerance (121 lines)
- period_building.py: Period construction from intervals (238 lines)
- period_statistics.py: Statistics and summaries (318 lines)
- period_merging.py: Overlap resolution (382 lines)
- relaxation.py: Per-day relaxation strategy (547 lines)
- core.py: Main API orchestration (251 lines)
- outlier_filtering.py: Statistical spike detection (294 lines)
- __init__.py: Public API exports (62 lines)

New statistical outlier filtering:
- Linear regression for trend-based spike detection
- 2 standard deviation confidence intervals (95%)
- Symmetry checking to preserve legitimate price shifts
- Enhanced zigzag detection with relative volatility (catches clusters)
- Replaces simple average smoothing with trend-based predictions

Smart impact tracking:
- Tests if original price would have passed criteria
- Only counts smoothed intervals that actually changed period formation
- Tracks level gap tolerance usage separately
- Both attributes only appear when > 0 (clean UI)

New period attributes:
- period_interval_smoothed_count: Intervals kept via outlier smoothing
- period_interval_level_gap_count: Intervals kept via gap tolerance

Impact: Statistical outlier filtering prevents isolated price spikes from
breaking continuous periods while preserving data integrity. All statistics
use original prices. Smart tracking shows only meaningful interventions,
making it clear when tolerance mechanisms actually influenced results.

Backwards compatible: All public APIs re-exported from period_utils package.
2025-11-12 16:37:34 +00:00

247 lines
9.5 KiB
Python

"""Period building and basic filtering logic."""
from __future__ import annotations
import logging
from datetime import date, timedelta
from typing import Any
from custom_components.tibber_prices.const import PRICE_LEVEL_MAPPING
from custom_components.tibber_prices.period_utils.level_filtering import (
apply_level_filter,
check_interval_criteria,
)
from custom_components.tibber_prices.period_utils.types import (
MINUTES_PER_INTERVAL,
IntervalCriteria,
)
from homeassistant.util import dt as dt_util
_LOGGER = logging.getLogger(__name__)
# Module-local log indentation (each module starts at level 0)
INDENT_L0 = "" # Entry point / main function
def split_intervals_by_day(all_prices: list[dict]) -> tuple[dict[date, list[dict]], dict[date, float]]:
"""Split intervals by day and calculate average price per day."""
intervals_by_day: dict[date, list[dict]] = {}
avg_price_by_day: dict[date, float] = {}
for price_data in all_prices:
dt = dt_util.parse_datetime(price_data["startsAt"])
if dt is None:
continue
dt = dt_util.as_local(dt)
date_key = dt.date()
intervals_by_day.setdefault(date_key, []).append(price_data)
for date_key, intervals in intervals_by_day.items():
avg_price_by_day[date_key] = sum(float(p["total"]) for p in intervals) / len(intervals)
return intervals_by_day, avg_price_by_day
def calculate_reference_prices(intervals_by_day: dict[date, list[dict]], *, reverse_sort: bool) -> dict[date, float]:
"""Calculate reference prices for each day (min for best, max for peak)."""
ref_prices: dict[date, float] = {}
for date_key, intervals in intervals_by_day.items():
prices = [float(p["total"]) for p in intervals]
ref_prices[date_key] = max(prices) if reverse_sort else min(prices)
return ref_prices
def build_periods( # noqa: PLR0915 - Complex period building logic requires many statements
all_prices: list[dict],
price_context: dict[str, Any],
*,
reverse_sort: bool,
level_filter: str | None = None,
gap_count: int = 0,
) -> list[list[dict]]:
"""
Build periods, allowing periods to cross midnight (day boundary).
Periods are built day-by-day, comparing each interval to its own day's reference.
When a day boundary is crossed, the current period is ended.
Adjacent periods at midnight are merged in a later step.
Args:
all_prices: All price data points
price_context: Dict with ref_prices, avg_prices, flex, min_distance_from_avg
reverse_sort: True for peak price (high prices), False for best price (low prices)
level_filter: Level filter string ("cheap", "expensive", "any", None)
gap_count: Number of allowed consecutive intervals deviating by exactly 1 level step
"""
ref_prices = price_context["ref_prices"]
avg_prices = price_context["avg_prices"]
flex = price_context["flex"]
min_distance_from_avg = price_context["min_distance_from_avg"]
# Calculate level_order if level_filter is active
level_order = None
level_filter_active = False
if level_filter and level_filter.lower() != "any":
level_order = PRICE_LEVEL_MAPPING.get(level_filter.upper(), 0)
level_filter_active = True
filter_direction = "" if reverse_sort else ""
gap_info = f", gap_tolerance={gap_count}" if gap_count > 0 else ""
_LOGGER.debug(
"%sLevel filter active: %s (order %s, require interval level %s filter level%s)",
INDENT_L0,
level_filter.upper(),
level_order,
filter_direction,
gap_info,
)
else:
status = "RELAXED to ANY" if (level_filter and level_filter.lower() == "any") else "DISABLED (not configured)"
_LOGGER.debug("%sLevel filter: %s (accepting all levels)", INDENT_L0, status)
periods: list[list[dict]] = []
current_period: list[dict] = []
last_ref_date: date | None = None
consecutive_gaps = 0 # Track consecutive intervals that deviate by 1 level step
intervals_checked = 0
intervals_filtered_by_level = 0
for price_data in all_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
date_key = starts_at.date()
# Use smoothed price for criteria checks (flex/distance)
# but preserve original price for period data
price_for_criteria = float(price_data["total"]) # Smoothed if this interval was an outlier
price_original = float(price_data.get("_original_price", price_data["total"]))
intervals_checked += 1
# Check flex and minimum distance criteria (using smoothed price)
criteria = IntervalCriteria(
ref_price=ref_prices[date_key],
avg_price=avg_prices[date_key],
flex=flex,
min_distance_from_avg=min_distance_from_avg,
reverse_sort=reverse_sort,
)
in_flex, meets_min_distance = check_interval_criteria(price_for_criteria, criteria)
# If this interval was smoothed, check if smoothing actually made a difference
smoothing_was_impactful = False
if price_data.get("_smoothed", False):
# Check if original price would have passed the same criteria
in_flex_original, meets_min_distance_original = check_interval_criteria(price_original, criteria)
# Smoothing was impactful if original would have failed but smoothed passed
smoothing_was_impactful = (in_flex and meets_min_distance) and not (
in_flex_original and meets_min_distance_original
)
# Level filter: Check if interval meets level requirement with gap tolerance
meets_level, consecutive_gaps, is_level_gap = apply_level_filter(
price_data, level_order, consecutive_gaps, gap_count, reverse_sort=reverse_sort
)
if not meets_level:
intervals_filtered_by_level += 1
# Split period if day changes
if last_ref_date is not None and date_key != last_ref_date and current_period:
periods.append(current_period)
current_period = []
consecutive_gaps = 0 # Reset gap counter on day boundary
last_ref_date = date_key
# Add to period if all criteria are met
if in_flex and meets_min_distance and meets_level:
current_period.append(
{
"interval_hour": starts_at.hour,
"interval_minute": starts_at.minute,
"interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}",
"price": price_original, # Use original price in period data
"interval_start": starts_at,
"smoothing_was_impactful": smoothing_was_impactful, # Only True if smoothing changed outcome
"is_level_gap": is_level_gap, # Track if kept due to level gap tolerance
}
)
elif current_period:
# Criteria no longer met, end current period
periods.append(current_period)
current_period = []
consecutive_gaps = 0 # Reset gap counter
# Add final period if exists
if current_period:
periods.append(current_period)
# Log summary
if level_filter_active and intervals_checked > 0:
filtered_pct = (intervals_filtered_by_level / intervals_checked) * 100
_LOGGER.debug(
"%sLevel filter summary: %d/%d intervals filtered (%.1f%%)",
INDENT_L0,
intervals_filtered_by_level,
intervals_checked,
filtered_pct,
)
return periods
def filter_periods_by_min_length(periods: list[list[dict]], min_period_length: int) -> list[list[dict]]:
"""Filter periods to only include those meeting the minimum length requirement."""
min_intervals = min_period_length // MINUTES_PER_INTERVAL
return [period for period in periods if len(period) >= min_intervals]
def add_interval_ends(periods: list[list[dict]]) -> None:
"""Add interval_end to each interval in-place."""
for period in periods:
for interval in period:
start = interval.get("interval_start")
if start:
interval["interval_end"] = start + timedelta(minutes=MINUTES_PER_INTERVAL)
def filter_periods_by_end_date(periods: list[list[dict]]) -> list[list[dict]]:
"""
Filter periods to keep only relevant ones for today and tomorrow.
Keep periods that:
- End in the future (> now)
- End today but after the start of the day (not exactly at midnight)
This removes:
- Periods that ended yesterday
- Periods that ended exactly at midnight today (they're completely in the past)
"""
now = dt_util.now()
today = now.date()
midnight_today = dt_util.start_of_local_day(now)
filtered = []
for period in periods:
if not period:
continue
# Get the end time of the period (last interval's end)
last_interval = period[-1]
period_end = last_interval.get("interval_end")
if not period_end:
continue
# Keep if period ends in the future
if period_end > now:
filtered.append(period)
continue
# Keep if period ends today but AFTER midnight (not exactly at midnight)
if period_end.date() == today and period_end > midnight_today:
filtered.append(period)
return filtered