hass.tibber_prices/custom_components/tibber_prices/period_utils/period_building.py

248 lines
9.5 KiB
Python

"""Period building and basic filtering logic."""
from __future__ import annotations
import logging
from datetime import date, timedelta
from typing import Any
from custom_components.tibber_prices.const import PRICE_LEVEL_MAPPING
from custom_components.tibber_prices.period_utils.level_filtering import (
apply_level_filter,
check_interval_criteria,
)
from custom_components.tibber_prices.period_utils.types import (
MINUTES_PER_INTERVAL,
IntervalCriteria,
)
from homeassistant.util import dt as dt_util
_LOGGER = logging.getLogger(__name__)
# Module-local log indentation (each module starts at level 0)
INDENT_L0 = "" # Entry point / main function
def split_intervals_by_day(all_prices: list[dict]) -> tuple[dict[date, list[dict]], dict[date, float]]:
"""Split intervals by day and calculate average price per day."""
intervals_by_day: dict[date, list[dict]] = {}
avg_price_by_day: dict[date, float] = {}
for price_data in all_prices:
dt = dt_util.parse_datetime(price_data["startsAt"])
if dt is None:
continue
dt = dt_util.as_local(dt)
date_key = dt.date()
intervals_by_day.setdefault(date_key, []).append(price_data)
for date_key, intervals in intervals_by_day.items():
avg_price_by_day[date_key] = sum(float(p["total"]) for p in intervals) / len(intervals)
return intervals_by_day, avg_price_by_day
def calculate_reference_prices(intervals_by_day: dict[date, list[dict]], *, reverse_sort: bool) -> dict[date, float]:
"""Calculate reference prices for each day (min for best, max for peak)."""
ref_prices: dict[date, float] = {}
for date_key, intervals in intervals_by_day.items():
prices = [float(p["total"]) for p in intervals]
ref_prices[date_key] = max(prices) if reverse_sort else min(prices)
return ref_prices
def build_periods( # noqa: PLR0915 - Complex period building logic requires many statements
all_prices: list[dict],
price_context: dict[str, Any],
*,
reverse_sort: bool,
level_filter: str | None = None,
gap_count: int = 0,
) -> list[list[dict]]:
"""
Build periods, allowing periods to cross midnight (day boundary).
Periods are built day-by-day, comparing each interval to its own day's reference.
When a day boundary is crossed, the current period is ended.
Adjacent periods at midnight are merged in a later step.
Args:
all_prices: All price data points
price_context: Dict with ref_prices, avg_prices, flex, min_distance_from_avg
reverse_sort: True for peak price (high prices), False for best price (low prices)
level_filter: Level filter string ("cheap", "expensive", "any", None)
gap_count: Number of allowed consecutive intervals deviating by exactly 1 level step
"""
ref_prices = price_context["ref_prices"]
avg_prices = price_context["avg_prices"]
flex = price_context["flex"]
min_distance_from_avg = price_context["min_distance_from_avg"]
# Calculate level_order if level_filter is active
level_order = None
level_filter_active = False
if level_filter and level_filter.lower() != "any":
level_order = PRICE_LEVEL_MAPPING.get(level_filter.upper(), 0)
level_filter_active = True
filter_direction = "" if reverse_sort else ""
gap_info = f", gap_tolerance={gap_count}" if gap_count > 0 else ""
_LOGGER.debug(
"%sLevel filter active: %s (order %s, require interval level %s filter level%s)",
INDENT_L0,
level_filter.upper(),
level_order,
filter_direction,
gap_info,
)
else:
status = "RELAXED to ANY" if (level_filter and level_filter.lower() == "any") else "DISABLED (not configured)"
_LOGGER.debug("%sLevel filter: %s (accepting all levels)", INDENT_L0, status)
periods: list[list[dict]] = []
current_period: list[dict] = []
last_ref_date: date | None = None
consecutive_gaps = 0 # Track consecutive intervals that deviate by 1 level step
intervals_checked = 0
intervals_filtered_by_level = 0
for price_data in all_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
date_key = starts_at.date()
# Use smoothed price for criteria checks (flex/distance)
# but preserve original price for period data
price_for_criteria = float(price_data["total"]) # Smoothed if this interval was an outlier
price_original = float(price_data.get("_original_price", price_data["total"]))
intervals_checked += 1
# Check flex and minimum distance criteria (using smoothed price)
criteria = IntervalCriteria(
ref_price=ref_prices[date_key],
avg_price=avg_prices[date_key],
flex=flex,
min_distance_from_avg=min_distance_from_avg,
reverse_sort=reverse_sort,
)
in_flex, meets_min_distance = check_interval_criteria(price_for_criteria, criteria)
# If this interval was smoothed, check if smoothing actually made a difference
smoothing_was_impactful = False
if price_data.get("_smoothed", False):
# Check if original price would have passed the same criteria
in_flex_original, meets_min_distance_original = check_interval_criteria(price_original, criteria)
# Smoothing was impactful if original would have failed but smoothed passed
smoothing_was_impactful = (in_flex and meets_min_distance) and not (
in_flex_original and meets_min_distance_original
)
# Level filter: Check if interval meets level requirement with gap tolerance
meets_level, consecutive_gaps, is_level_gap = apply_level_filter(
price_data, level_order, consecutive_gaps, gap_count, reverse_sort=reverse_sort
)
if not meets_level:
intervals_filtered_by_level += 1
# Split period if day changes
if last_ref_date is not None and date_key != last_ref_date and current_period:
periods.append(current_period)
current_period = []
consecutive_gaps = 0 # Reset gap counter on day boundary
last_ref_date = date_key
# Add to period if all criteria are met
if in_flex and meets_min_distance and meets_level:
current_period.append(
{
"interval_hour": starts_at.hour,
"interval_minute": starts_at.minute,
"interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}",
"price": price_original, # Use original price in period data
"interval_start": starts_at,
# Only True if smoothing changed whether the interval qualified for period inclusion
"smoothing_was_impactful": smoothing_was_impactful,
"is_level_gap": is_level_gap, # Track if kept due to level gap tolerance
}
)
elif current_period:
# Criteria no longer met, end current period
periods.append(current_period)
current_period = []
consecutive_gaps = 0 # Reset gap counter
# Add final period if exists
if current_period:
periods.append(current_period)
# Log summary
if level_filter_active and intervals_checked > 0:
filtered_pct = (intervals_filtered_by_level / intervals_checked) * 100
_LOGGER.debug(
"%sLevel filter summary: %d/%d intervals filtered (%.1f%%)",
INDENT_L0,
intervals_filtered_by_level,
intervals_checked,
filtered_pct,
)
return periods
def filter_periods_by_min_length(periods: list[list[dict]], min_period_length: int) -> list[list[dict]]:
"""Filter periods to only include those meeting the minimum length requirement."""
min_intervals = min_period_length // MINUTES_PER_INTERVAL
return [period for period in periods if len(period) >= min_intervals]
def add_interval_ends(periods: list[list[dict]]) -> None:
"""Add interval_end to each interval in-place."""
for period in periods:
for interval in period:
start = interval.get("interval_start")
if start:
interval["interval_end"] = start + timedelta(minutes=MINUTES_PER_INTERVAL)
def filter_periods_by_end_date(periods: list[list[dict]]) -> list[list[dict]]:
"""
Filter periods to keep only relevant ones for today and tomorrow.
Keep periods that:
- End in the future (> now)
- End today but after the start of the day (not exactly at midnight)
This removes:
- Periods that ended yesterday
- Periods that ended exactly at midnight today (they're completely in the past)
"""
now = dt_util.now()
today = now.date()
midnight_today = dt_util.start_of_local_day(now)
filtered = []
for period in periods:
if not period:
continue
# Get the end time of the period (last interval's end)
last_interval = period[-1]
period_end = last_interval.get("interval_end")
if not period_end:
continue
# Keep if period ends in the future
if period_end > now:
filtered.append(period)
continue
# Keep if period ends today but AFTER midnight (not exactly at midnight)
if period_end.date() == today and period_end > midnight_today:
filtered.append(period)
return filtered