"""Utility functions for calculating price periods (best price and peak price).""" from __future__ import annotations import logging from datetime import date, datetime, timedelta from typing import TYPE_CHECKING, Any, NamedTuple if TYPE_CHECKING: from collections.abc import Callable from homeassistant.util import dt as dt_util from .const import ( DEFAULT_PRICE_RATING_THRESHOLD_HIGH, DEFAULT_PRICE_RATING_THRESHOLD_LOW, DEFAULT_VOLATILITY_THRESHOLD_HIGH, DEFAULT_VOLATILITY_THRESHOLD_MODERATE, DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, PRICE_LEVEL_MAPPING, ) from .price_utils import ( aggregate_period_levels, aggregate_period_ratings, calculate_volatility_level, ) _LOGGER = logging.getLogger(__name__) MINUTES_PER_INTERVAL = 15 # Log indentation levels for visual hierarchy INDENT_L0 = "" # Top level (calculate_periods_with_relaxation) INDENT_L1 = " " # Per-day loop INDENT_L2 = " " # Flex/filter loop (_relax_single_day) INDENT_L3 = " " # _resolve_period_overlaps function INDENT_L4 = " " # Period-by-period analysis INDENT_L5 = " " # Segment details class PeriodConfig(NamedTuple): """Configuration for period calculation.""" reverse_sort: bool flex: float min_distance_from_avg: float min_period_length: int threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH threshold_volatility_moderate: float = DEFAULT_VOLATILITY_THRESHOLD_MODERATE threshold_volatility_high: float = DEFAULT_VOLATILITY_THRESHOLD_HIGH threshold_volatility_very_high: float = DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH level_filter: str | None = None # "any", "cheap", "expensive", etc. or None gap_count: int = 0 # Number of allowed consecutive deviating intervals class PeriodData(NamedTuple): """Data for building a period summary.""" start_time: datetime end_time: datetime period_length: int period_idx: int total_periods: int class PeriodStatistics(NamedTuple): """Calculated statistics for a period.""" aggregated_level: str | None aggregated_rating: str | None rating_difference_pct: float | None price_avg: float price_min: float price_max: float price_spread: float volatility: str period_price_diff: float | None period_price_diff_pct: float | None class ThresholdConfig(NamedTuple): """Threshold configuration for period calculations.""" threshold_low: float | None threshold_high: float | None threshold_volatility_moderate: float threshold_volatility_high: float threshold_volatility_very_high: float reverse_sort: bool class IntervalCriteria(NamedTuple): """Criteria for checking if an interval qualifies for a period.""" ref_price: float avg_price: float flex: float min_distance_from_avg: float reverse_sort: bool def calculate_periods( all_prices: list[dict], *, config: PeriodConfig, ) -> dict[str, Any]: """ Calculate price periods (best or peak) from price data. This function identifies periods but does NOT store full interval data redundantly. It returns lightweight period summaries that reference the original price data. Steps: 1. Split prices by day and calculate daily averages 2. Calculate reference prices (min/max per day) 3. Build periods based on criteria 4. Filter by minimum length 5. Merge adjacent periods at midnight 6. Extract period summaries (start/end times, not full price data) Args: all_prices: All price data points from yesterday/today/tomorrow config: Period configuration containing reverse_sort, flex, min_distance_from_avg, min_period_length, threshold_low, and threshold_high Returns: Dict with: - periods: List of lightweight period summaries (start/end times only) - metadata: Config and statistics - reference_data: Daily min/max/avg for on-demand annotation """ # Extract config values reverse_sort = config.reverse_sort flex = config.flex min_distance_from_avg = config.min_distance_from_avg min_period_length = config.min_period_length threshold_low = config.threshold_low threshold_high = config.threshold_high if not all_prices: return { "periods": [], "metadata": { "total_periods": 0, "config": { "reverse_sort": reverse_sort, "flex": flex, "min_distance_from_avg": min_distance_from_avg, "min_period_length": min_period_length, }, }, "reference_data": { "ref_prices": {}, "avg_prices": {}, }, } # Ensure prices are sorted chronologically all_prices_sorted = sorted(all_prices, key=lambda p: p["startsAt"]) # Step 1: Split by day and calculate averages intervals_by_day, avg_price_by_day = _split_intervals_by_day(all_prices_sorted) # Step 2: Calculate reference prices (min or max per day) ref_prices = _calculate_reference_prices(intervals_by_day, reverse_sort=reverse_sort) # Step 3: Build periods price_context = { "ref_prices": ref_prices, "avg_prices": avg_price_by_day, "flex": flex, "min_distance_from_avg": min_distance_from_avg, } raw_periods = _build_periods( all_prices_sorted, price_context, reverse_sort=reverse_sort, level_filter=config.level_filter, gap_count=config.gap_count, ) # Step 4: Filter by minimum length raw_periods = _filter_periods_by_min_length(raw_periods, min_period_length) # Step 5: Merge adjacent periods at midnight raw_periods = _merge_adjacent_periods_at_midnight(raw_periods) # Step 6: Add interval ends _add_interval_ends(raw_periods) # Step 7: Filter periods by end date (keep periods ending today or later) raw_periods = _filter_periods_by_end_date(raw_periods) # Step 8: Extract lightweight period summaries (no full price data) # Note: Filtering for current/future is done here based on end date, # not start date. This preserves periods that started yesterday but end today. thresholds = ThresholdConfig( threshold_low=threshold_low, threshold_high=threshold_high, threshold_volatility_moderate=config.threshold_volatility_moderate, threshold_volatility_high=config.threshold_volatility_high, threshold_volatility_very_high=config.threshold_volatility_very_high, reverse_sort=reverse_sort, ) period_summaries = _extract_period_summaries( raw_periods, all_prices_sorted, price_context, thresholds, ) return { "periods": period_summaries, # Lightweight summaries only "metadata": { "total_periods": len(period_summaries), "config": { "reverse_sort": reverse_sort, "flex": flex, "min_distance_from_avg": min_distance_from_avg, "min_period_length": min_period_length, }, }, "reference_data": { "ref_prices": {k.isoformat(): v for k, v in ref_prices.items()}, "avg_prices": {k.isoformat(): v for k, v in avg_price_by_day.items()}, }, } def _split_intervals_by_day(all_prices: list[dict]) -> tuple[dict[date, list[dict]], dict[date, float]]: """Split intervals by day and calculate average price per day.""" intervals_by_day: dict[date, list[dict]] = {} avg_price_by_day: dict[date, float] = {} for price_data in all_prices: dt = dt_util.parse_datetime(price_data["startsAt"]) if dt is None: continue dt = dt_util.as_local(dt) date_key = dt.date() intervals_by_day.setdefault(date_key, []).append(price_data) for date_key, intervals in intervals_by_day.items(): avg_price_by_day[date_key] = sum(float(p["total"]) for p in intervals) / len(intervals) return intervals_by_day, avg_price_by_day def _calculate_reference_prices(intervals_by_day: dict[date, list[dict]], *, reverse_sort: bool) -> dict[date, float]: """Calculate reference prices for each day (min for best, max for peak).""" ref_prices: dict[date, float] = {} for date_key, intervals in intervals_by_day.items(): prices = [float(p["total"]) for p in intervals] ref_prices[date_key] = max(prices) if reverse_sort else min(prices) return ref_prices def _check_level_with_gap_tolerance( interval_level: int, level_order: int, consecutive_gaps: int, gap_count: int, *, reverse_sort: bool, ) -> tuple[bool, bool, int]: """ Check if interval meets level requirement with gap tolerance. Args: interval_level: Level value of current interval (from PRICE_LEVEL_MAPPING) level_order: Required level value consecutive_gaps: Current count of consecutive gap intervals gap_count: Maximum allowed consecutive gap intervals reverse_sort: True for peak price, False for best price Returns: Tuple of (meets_level, is_gap, new_consecutive_gaps): - meets_level: True if interval qualifies (exact match or within gap tolerance) - is_gap: True if this is a gap interval (deviates by exactly 1 step) - new_consecutive_gaps: Updated gap counter """ if reverse_sort: # Peak price: interval must be >= level_order (e.g., EXPENSIVE or higher) meets_level_exact = interval_level >= level_order # Gap: exactly 1 step below (e.g., NORMAL when expecting EXPENSIVE) is_gap = interval_level == level_order - 1 else: # Best price: interval must be <= level_order (e.g., CHEAP or lower) meets_level_exact = interval_level <= level_order # Gap: exactly 1 step above (e.g., NORMAL when expecting CHEAP) is_gap = interval_level == level_order + 1 # Apply gap tolerance if meets_level_exact: return True, False, 0 # Meets level, not a gap, reset counter if is_gap and consecutive_gaps < gap_count: return True, True, consecutive_gaps + 1 # Allowed gap, increment counter return False, False, 0 # Doesn't meet level, reset counter def _apply_level_filter( price_data: dict, level_order: int | None, consecutive_gaps: int, gap_count: int, *, reverse_sort: bool, ) -> tuple[bool, int]: """ Apply level filter to a single interval. Args: price_data: Price data dict with "level" key level_order: Required level value (from PRICE_LEVEL_MAPPING) or None if disabled consecutive_gaps: Current count of consecutive gap intervals gap_count: Maximum allowed consecutive gap intervals reverse_sort: True for peak price, False for best price Returns: Tuple of (meets_level, new_consecutive_gaps) """ if level_order is None: return True, consecutive_gaps interval_level = PRICE_LEVEL_MAPPING.get(price_data.get("level", "NORMAL"), 0) meets_level, _is_gap, new_consecutive_gaps = _check_level_with_gap_tolerance( interval_level, level_order, consecutive_gaps, gap_count, reverse_sort=reverse_sort ) return meets_level, new_consecutive_gaps def _check_interval_criteria( price: float, criteria: IntervalCriteria, ) -> tuple[bool, bool]: """ Check if interval meets flex and minimum distance criteria. Args: price: Interval price criteria: Interval criteria (ref_price, avg_price, flex, etc.) Returns: Tuple of (in_flex, meets_min_distance) """ # Calculate percentage difference from reference percent_diff = ((price - criteria.ref_price) / criteria.ref_price) * 100 if criteria.ref_price != 0 else 0.0 # Check if interval qualifies for the period in_flex = percent_diff >= criteria.flex * 100 if criteria.reverse_sort else percent_diff <= criteria.flex * 100 # Minimum distance from average if criteria.reverse_sort: # Peak price: must be at least min_distance_from_avg% above average min_distance_threshold = criteria.avg_price * (1 + criteria.min_distance_from_avg / 100) meets_min_distance = price >= min_distance_threshold else: # Best price: must be at least min_distance_from_avg% below average min_distance_threshold = criteria.avg_price * (1 - criteria.min_distance_from_avg / 100) meets_min_distance = price <= min_distance_threshold return in_flex, meets_min_distance def _build_periods( all_prices: list[dict], price_context: dict[str, Any], *, reverse_sort: bool, level_filter: str | None = None, gap_count: int = 0, ) -> list[list[dict]]: """ Build periods, allowing periods to cross midnight (day boundary). Periods are built day-by-day, comparing each interval to its own day's reference. When a day boundary is crossed, the current period is ended. Adjacent periods at midnight are merged in a later step. Args: all_prices: All price data points price_context: Dict with ref_prices, avg_prices, flex, min_distance_from_avg reverse_sort: True for peak price (high prices), False for best price (low prices) level_filter: Level filter string ("cheap", "expensive", "any", None) gap_count: Number of allowed consecutive intervals deviating by exactly 1 level step """ ref_prices = price_context["ref_prices"] avg_prices = price_context["avg_prices"] flex = price_context["flex"] min_distance_from_avg = price_context["min_distance_from_avg"] # Calculate level_order if level_filter is active level_order = None level_filter_active = False if level_filter and level_filter.lower() != "any": level_order = PRICE_LEVEL_MAPPING.get(level_filter.upper(), 0) level_filter_active = True filter_direction = "≥" if reverse_sort else "≤" gap_info = f", gap_tolerance={gap_count}" if gap_count > 0 else "" _LOGGER.debug( "%sLevel filter active: %s (order %s, require interval level %s filter level%s)", INDENT_L3, level_filter.upper(), level_order, filter_direction, gap_info, ) else: status = "RELAXED to ANY" if (level_filter and level_filter.lower() == "any") else "DISABLED (not configured)" _LOGGER.debug("%sLevel filter: %s (accepting all levels)", INDENT_L3, status) periods: list[list[dict]] = [] current_period: list[dict] = [] last_ref_date: date | None = None consecutive_gaps = 0 # Track consecutive intervals that deviate by 1 level step intervals_checked = 0 intervals_filtered_by_level = 0 for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) date_key = starts_at.date() price = float(price_data["total"]) intervals_checked += 1 # Check flex and minimum distance criteria criteria = IntervalCriteria( ref_price=ref_prices[date_key], avg_price=avg_prices[date_key], flex=flex, min_distance_from_avg=min_distance_from_avg, reverse_sort=reverse_sort, ) in_flex, meets_min_distance = _check_interval_criteria(price, criteria) # Level filter: Check if interval meets level requirement with gap tolerance meets_level, consecutive_gaps = _apply_level_filter( price_data, level_order, consecutive_gaps, gap_count, reverse_sort=reverse_sort ) if not meets_level: intervals_filtered_by_level += 1 # Split period if day changes if last_ref_date is not None and date_key != last_ref_date and current_period: periods.append(current_period) current_period = [] consecutive_gaps = 0 # Reset gap counter on day boundary last_ref_date = date_key # Add to period if all criteria are met if in_flex and meets_min_distance and meets_level: current_period.append( { "interval_hour": starts_at.hour, "interval_minute": starts_at.minute, "interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}", "price": price, "interval_start": starts_at, } ) elif current_period: # Criteria no longer met, end current period periods.append(current_period) current_period = [] consecutive_gaps = 0 # Reset gap counter # Add final period if exists if current_period: periods.append(current_period) # Log summary if level_filter_active and intervals_checked > 0: filtered_pct = (intervals_filtered_by_level / intervals_checked) * 100 _LOGGER.debug( "%sLevel filter summary: %d/%d intervals filtered (%.1f%%)", INDENT_L3, intervals_filtered_by_level, intervals_checked, filtered_pct, ) return periods def _filter_periods_by_min_length(periods: list[list[dict]], min_period_length: int) -> list[list[dict]]: """Filter periods to only include those meeting the minimum length requirement.""" min_intervals = min_period_length // MINUTES_PER_INTERVAL return [period for period in periods if len(period) >= min_intervals] def _merge_adjacent_periods_at_midnight(periods: list[list[dict]]) -> list[list[dict]]: """ Merge adjacent periods that meet at midnight. When two periods are detected separately for consecutive days but are directly adjacent at midnight (15 minutes apart), merge them into a single period. """ if not periods: return periods merged = [] i = 0 while i < len(periods): current_period = periods[i] # Check if there's a next period and if they meet at midnight if i + 1 < len(periods): next_period = periods[i + 1] last_start = current_period[-1].get("interval_start") next_start = next_period[0].get("interval_start") if last_start and next_start: time_diff = next_start - last_start last_date = last_start.date() next_date = next_start.date() # If they are 15 minutes apart and on different days (crossing midnight) if time_diff == timedelta(minutes=MINUTES_PER_INTERVAL) and next_date > last_date: # Merge the two periods merged_period = current_period + next_period merged.append(merged_period) i += 2 # Skip both periods as we've merged them continue # If no merge happened, just add the current period merged.append(current_period) i += 1 return merged def _add_interval_ends(periods: list[list[dict]]) -> None: """Add interval_end to each interval in-place.""" for period in periods: for interval in period: start = interval.get("interval_start") if start: interval["interval_end"] = start + timedelta(minutes=MINUTES_PER_INTERVAL) def _filter_periods_by_end_date(periods: list[list[dict]]) -> list[list[dict]]: """ Filter periods to keep only relevant ones for today and tomorrow. Keep periods that: - End in the future (> now) - End today but after the start of the day (not exactly at midnight) This removes: - Periods that ended yesterday - Periods that ended exactly at midnight today (they're completely in the past) """ now = dt_util.now() today = now.date() midnight_today = dt_util.start_of_local_day(now) filtered = [] for period in periods: if not period: continue # Get the end time of the period (last interval's end) last_interval = period[-1] period_end = last_interval.get("interval_end") if not period_end: continue # Keep if period ends in the future if period_end > now: filtered.append(period) continue # Keep if period ends today but AFTER midnight (not exactly at midnight) if period_end.date() == today and period_end > midnight_today: filtered.append(period) return filtered def _calculate_period_price_diff( price_avg: float, start_time: datetime, price_context: dict[str, Any], ) -> tuple[float | None, float | None]: """ Calculate period price difference from daily reference (min or max). Uses reference price from start day of the period for consistency. Returns: Tuple of (period_price_diff, period_price_diff_pct) or (None, None) if no reference available. """ if not price_context or not start_time: return None, None ref_prices = price_context.get("ref_prices", {}) date_key = start_time.date() ref_price = ref_prices.get(date_key) if ref_price is None: return None, None # Convert reference price to minor units (ct/øre) ref_price_minor = round(ref_price * 100, 2) period_price_diff = round(price_avg - ref_price_minor, 2) period_price_diff_pct = None if ref_price_minor != 0: period_price_diff_pct = round((period_price_diff / ref_price_minor) * 100, 2) return period_price_diff, period_price_diff_pct def _calculate_aggregated_rating_difference(period_price_data: list[dict]) -> float | None: """ Calculate aggregated rating difference percentage for the period. Takes the average of all interval differences (from their respective thresholds). Args: period_price_data: List of price data dictionaries with "difference" field Returns: Average difference percentage, or None if no valid data """ differences = [] for price_data in period_price_data: diff = price_data.get("difference") if diff is not None: differences.append(float(diff)) if not differences: return None return round(sum(differences) / len(differences), 2) def _calculate_period_price_statistics(period_price_data: list[dict]) -> dict[str, float]: """ Calculate price statistics for a period. Args: period_price_data: List of price data dictionaries with "total" field Returns: Dictionary with price_avg, price_min, price_max, price_spread (all in minor units: ct/øre) """ prices_minor = [round(float(p["total"]) * 100, 2) for p in period_price_data] if not prices_minor: return { "price_avg": 0.0, "price_min": 0.0, "price_max": 0.0, "price_spread": 0.0, } price_avg = round(sum(prices_minor) / len(prices_minor), 2) price_min = round(min(prices_minor), 2) price_max = round(max(prices_minor), 2) price_spread = round(price_max - price_min, 2) return { "price_avg": price_avg, "price_min": price_min, "price_max": price_max, "price_spread": price_spread, } def _build_period_summary_dict( period_data: PeriodData, stats: PeriodStatistics, *, reverse_sort: bool, ) -> dict: """ Build the complete period summary dictionary. Args: period_data: Period timing and position data stats: Calculated period statistics reverse_sort: True for peak price, False for best price (keyword-only) Returns: Complete period summary dictionary following attribute ordering """ # Build complete period summary (following attribute ordering from AGENTS.md) summary = { # 1. Time information (when does this apply?) "start": period_data.start_time, "end": period_data.end_time, "duration_minutes": period_data.period_length * MINUTES_PER_INTERVAL, # 2. Core decision attributes (what should I do?) "level": stats.aggregated_level, "rating_level": stats.aggregated_rating, "rating_difference_%": stats.rating_difference_pct, # 3. Price statistics (how much does it cost?) "price_avg": stats.price_avg, "price_min": stats.price_min, "price_max": stats.price_max, "price_spread": stats.price_spread, "volatility": stats.volatility, # 4. Price differences will be added below if available # 5. Detail information (additional context) "period_interval_count": period_data.period_length, "period_position": period_data.period_idx, "periods_total": period_data.total_periods, "periods_remaining": period_data.total_periods - period_data.period_idx, } # Add period price difference attributes based on sensor type (step 4) if stats.period_price_diff is not None: if reverse_sort: # Peak price sensor: compare to daily maximum summary["period_price_diff_from_daily_max"] = stats.period_price_diff if stats.period_price_diff_pct is not None: summary["period_price_diff_from_daily_max_%"] = stats.period_price_diff_pct else: # Best price sensor: compare to daily minimum summary["period_price_diff_from_daily_min"] = stats.period_price_diff if stats.period_price_diff_pct is not None: summary["period_price_diff_from_daily_min_%"] = stats.period_price_diff_pct return summary def _extract_period_summaries( periods: list[list[dict]], all_prices: list[dict], price_context: dict[str, Any], thresholds: ThresholdConfig, ) -> list[dict]: """ Extract complete period summaries with all aggregated attributes. Returns sensor-ready period summaries with: - Timestamps and positioning (start, end, hour, minute, time) - Aggregated price statistics (price_avg, price_min, price_max, price_spread) - Volatility categorization (low/moderate/high/very_high based on absolute spread) - Rating difference percentage (aggregated from intervals) - Period price differences (period_price_diff_from_daily_min/max) - Aggregated level and rating_level - Interval count (number of 15-min intervals in period) All data is pre-calculated and ready for display - no further processing needed. Args: periods: List of periods, where each period is a list of interval dictionaries all_prices: All price data from the API (enriched with level, difference, rating_level) price_context: Dictionary with ref_prices and avg_prices per day thresholds: Threshold configuration for calculations """ # Build lookup dictionary for full price data by timestamp price_lookup: dict[str, dict] = {} for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at: starts_at = dt_util.as_local(starts_at) price_lookup[starts_at.isoformat()] = price_data summaries = [] total_periods = len(periods) for period_idx, period in enumerate(periods, 1): if not period: continue first_interval = period[0] last_interval = period[-1] start_time = first_interval.get("interval_start") end_time = last_interval.get("interval_end") if not start_time or not end_time: continue # Look up full price data for each interval in the period period_price_data: list[dict] = [] for interval in period: start = interval.get("interval_start") if not start: continue start_iso = start.isoformat() price_data = price_lookup.get(start_iso) if price_data: period_price_data.append(price_data) # Calculate aggregated level and rating_level aggregated_level = None aggregated_rating = None if period_price_data: # Aggregate level (from API's "level" field) aggregated_level = aggregate_period_levels(period_price_data) # Aggregate rating_level (from calculated "rating_level" and "difference" fields) if thresholds.threshold_low is not None and thresholds.threshold_high is not None: aggregated_rating, _ = aggregate_period_ratings( period_price_data, thresholds.threshold_low, thresholds.threshold_high, ) # Calculate price statistics (in minor units: ct/øre) price_stats = _calculate_period_price_statistics(period_price_data) # Calculate period price difference from daily reference period_price_diff, period_price_diff_pct = _calculate_period_price_diff( price_stats["price_avg"], start_time, price_context ) # Calculate volatility (categorical) and aggregated rating difference (numeric) volatility = calculate_volatility_level( price_stats["price_spread"], threshold_moderate=thresholds.threshold_volatility_moderate, threshold_high=thresholds.threshold_volatility_high, threshold_very_high=thresholds.threshold_volatility_very_high, ).lower() rating_difference_pct = _calculate_aggregated_rating_difference(period_price_data) # Build period data and statistics objects period_data = PeriodData( start_time=start_time, end_time=end_time, period_length=len(period), period_idx=period_idx, total_periods=total_periods, ) stats = PeriodStatistics( aggregated_level=aggregated_level, aggregated_rating=aggregated_rating, rating_difference_pct=rating_difference_pct, price_avg=price_stats["price_avg"], price_min=price_stats["price_min"], price_max=price_stats["price_max"], price_spread=price_stats["price_spread"], volatility=volatility, period_price_diff=period_price_diff, period_price_diff_pct=period_price_diff_pct, ) # Build complete period summary summary = _build_period_summary_dict(period_data, stats, reverse_sort=thresholds.reverse_sort) summaries.append(summary) return summaries def _recalculate_period_metadata(periods: list[dict]) -> None: """ Recalculate period metadata after merging periods. Updates period_position, periods_total, and periods_remaining for all periods based on chronological order. This must be called after _resolve_period_overlaps() to ensure metadata reflects the final merged period list. Args: periods: List of period summary dicts (mutated in-place) """ if not periods: return # Sort periods chronologically by start time periods.sort(key=lambda p: p.get("start") or dt_util.now()) # Update metadata for all periods total_periods = len(periods) for position, period in enumerate(periods, 1): period["period_position"] = position period["periods_total"] = total_periods period["periods_remaining"] = total_periods - position def filter_periods_by_volatility( periods_data: dict[str, Any], min_volatility: str, ) -> dict[str, Any]: """ Filter calculated periods based on their internal volatility. This applies period-level volatility filtering AFTER periods have been calculated. Removes periods that don't meet the minimum volatility requirement based on their own price spread (volatility attribute), not the daily volatility. Args: periods_data: Dict with "periods" and "intervals" lists from calculate_periods_with_relaxation() min_volatility: Minimum volatility level required ("low", "moderate", "high", "very_high") Returns: Filtered periods_data dict with updated periods, intervals, and metadata. """ periods = periods_data.get("periods", []) if not periods: return periods_data # "low" means no filtering (accept any volatility level) if min_volatility == "low": return periods_data # Define volatility hierarchy (LOW < MODERATE < HIGH < VERY_HIGH) volatility_levels = ["LOW", "MODERATE", "HIGH", "VERY_HIGH"] # Map filter config values to actual level names config_to_level = { "low": "LOW", "moderate": "MODERATE", "high": "HIGH", "very_high": "VERY_HIGH", } min_level = config_to_level.get(min_volatility, "LOW") # Filter periods based on their volatility filtered_periods = [] for period in periods: period_volatility = period.get("volatility", "MODERATE") # Check if period's volatility meets or exceeds minimum requirement try: period_idx = volatility_levels.index(period_volatility) min_idx = volatility_levels.index(min_level) except ValueError: # If level not found, don't filter out this period filtered_periods.append(period) else: if period_idx >= min_idx: filtered_periods.append(period) # If no periods left after filtering, return empty structure if not filtered_periods: return { "periods": [], "intervals": [], "metadata": { "total_intervals": 0, "total_periods": 0, "config": periods_data.get("metadata", {}).get("config", {}), }, } # Collect intervals from filtered periods filtered_intervals = [] for period in filtered_periods: filtered_intervals.extend(period.get("intervals", [])) # Update metadata return { "periods": filtered_periods, "intervals": filtered_intervals, "metadata": { "total_intervals": len(filtered_intervals), "total_periods": len(filtered_periods), "config": periods_data.get("metadata", {}).get("config", {}), }, } def _group_periods_by_day(periods: list[dict]) -> dict[date, list[dict]]: """ Group periods by the day they end in. This ensures periods crossing midnight are counted towards the day they end, not the day they start. Example: Period 23:00 yesterday - 02:00 today counts as "today" since it ends today. Args: periods: List of period summary dicts with "start" and "end" datetime Returns: Dict mapping date to list of periods ending on that date """ periods_by_day: dict[date, list[dict]] = {} for period in periods: # Use end time for grouping so periods crossing midnight are counted # towards the day they end (more relevant for min_periods check) end_time = period.get("end") if end_time: day = end_time.date() periods_by_day.setdefault(day, []).append(period) return periods_by_day def _group_prices_by_day(all_prices: list[dict]) -> dict[date, list[dict]]: """ Group price intervals by the day they belong to (today and future only). Args: all_prices: List of price dicts with "startsAt" timestamp Returns: Dict mapping date to list of price intervals for that day (only today and future) """ today = dt_util.now().date() prices_by_day: dict[date, list[dict]] = {} for price in all_prices: starts_at = dt_util.parse_datetime(price["startsAt"]) if starts_at: price_date = dt_util.as_local(starts_at).date() # Only include today and future days if price_date >= today: prices_by_day.setdefault(price_date, []).append(price) return prices_by_day def _check_min_periods_per_day(periods: list[dict], min_periods: int, all_prices: list[dict]) -> bool: """ Check if minimum periods requirement is met for each day individually. Returns True if we should STOP relaxation (enough periods found per day). Returns False if we should CONTINUE relaxation (not enough periods yet). Args: periods: List of period summary dicts min_periods: Minimum number of periods required per day all_prices: All available price intervals (used to determine which days have data) Returns: True if every day with price data has at least min_periods, False otherwise """ if not periods: return False # No periods at all, continue relaxation # Get all days that have price data (today and future only, not yesterday) today = dt_util.now().date() available_days = set() for price in all_prices: starts_at = dt_util.parse_datetime(price["startsAt"]) if starts_at: price_date = dt_util.as_local(starts_at).date() # Only count today and future days (not yesterday) if price_date >= today: available_days.add(price_date) if not available_days: return False # No price data for today/future, continue relaxation # Group found periods by day periods_by_day = _group_periods_by_day(periods) # Check each day with price data: ALL must have at least min_periods # Only count standalone periods (exclude extensions) for day in available_days: day_periods = periods_by_day.get(day, []) # Count only standalone periods (not extensions) standalone_count = sum(1 for p in day_periods if not p.get("is_extension")) if standalone_count < min_periods: _LOGGER.debug( "Day %s has only %d standalone periods (need %d) - continuing relaxation", day, standalone_count, min_periods, ) return False # This day doesn't have enough, continue relaxation # All days with price data have enough periods, stop relaxation return True def calculate_periods_with_relaxation( # noqa: PLR0913, PLR0915 - Per-day relaxation requires many parameters and statements all_prices: list[dict], *, config: PeriodConfig, enable_relaxation: bool, min_periods: int, relaxation_step_pct: int, should_show_callback: Callable[[str | None, str | None], bool], ) -> tuple[dict[str, Any], dict[str, Any]]: """ Calculate periods with optional per-day filter relaxation. NEW: Each day gets its own independent relaxation loop. Today can be in Phase 1 while tomorrow is in Phase 3, ensuring each day finds enough periods. If min_periods is not reached with normal filters, this function gradually relaxes filters in multiple phases FOR EACH DAY SEPARATELY: Phase 1: Increase flex threshold step-by-step (up to 4 attempts) Phase 2: Disable volatility filter (set to "any") Phase 3: Disable level filter (set to "any") Args: all_prices: All price data points config: Base period configuration enable_relaxation: Whether relaxation is enabled min_periods: Minimum number of periods required PER DAY relaxation_step_pct: Percentage of original flex to add per relaxation step should_show_callback: Callback function(volatility_override, level_override) -> bool Returns True if periods should be shown with given filter overrides. Pass None to use original configured filter values. Returns: Tuple of (periods_result, relaxation_metadata): - periods_result: Same format as calculate_periods() output, with periods from all days - relaxation_metadata: Dict with relaxation information (aggregated across all days) """ # Compact INFO-level summary period_type = "PEAK PRICE" if config.reverse_sort else "BEST PRICE" relaxation_status = "ON" if enable_relaxation else "OFF" if enable_relaxation: _LOGGER.info( "Calculating %s periods: relaxation=%s, target=%d/day, flex=%.1f%%", period_type, relaxation_status, min_periods, abs(config.flex) * 100, ) else: _LOGGER.info( "Calculating %s periods: relaxation=%s, flex=%.1f%%", period_type, relaxation_status, abs(config.flex) * 100, ) # Detailed DEBUG-level context header period_type_full = "PEAK PRICE (most expensive)" if config.reverse_sort else "BEST PRICE (cheapest)" _LOGGER.debug( "%s========== %s PERIODS ==========", INDENT_L0, period_type_full, ) _LOGGER.debug( "%sRelaxation: %s", INDENT_L0, "ENABLED (user setting: ON)" if enable_relaxation else "DISABLED by user configuration", ) _LOGGER.debug( "%sBase config: flex=%.1f%%, min_length=%d min", INDENT_L0, abs(config.flex) * 100, config.min_period_length, ) if enable_relaxation: _LOGGER.debug( "%sRelaxation target: %d periods per day", INDENT_L0, min_periods, ) _LOGGER.debug( "%sRelaxation strategy: %.1f%% flex increment per step (4 flex levels x 4 filter combinations)", INDENT_L0, relaxation_step_pct, ) _LOGGER.debug( "%sEarly exit: After EACH filter combination when target reached", INDENT_L0, ) _LOGGER.debug( "%s=============================================", INDENT_L0, ) # Group prices by day (for both relaxation enabled/disabled) prices_by_day = _group_prices_by_day(all_prices) if not prices_by_day: # No price data for today/future _LOGGER.warning( "No price data available for today/future - cannot calculate periods", ) return {"periods": [], "metadata": {}, "reference_data": {}}, { "relaxation_active": False, "relaxation_attempted": False, "min_periods_requested": min_periods if enable_relaxation else 0, "periods_found": 0, } total_days = len(prices_by_day) _LOGGER.info( "Calculating baseline periods for %d days...", total_days, ) # === BASELINE CALCULATION (same for both modes) === all_periods: list[dict] = [] all_phases_used: list[str] = [] relaxation_was_needed = False days_meeting_requirement = 0 for day, day_prices in sorted(prices_by_day.items()): _LOGGER.debug( "%sProcessing day %s with %d price intervals", INDENT_L1, day, len(day_prices), ) # Calculate baseline periods for this day day_result = calculate_periods(day_prices, config=config) day_periods = day_result["periods"] standalone_count = len([p for p in day_periods if not p.get("is_extension")]) _LOGGER.debug( "%sDay %s baseline: Found %d standalone periods%s", INDENT_L1, day, standalone_count, f" (need {min_periods})" if enable_relaxation else "", ) # Check if relaxation is needed for this day if not enable_relaxation or standalone_count >= min_periods: # No relaxation needed/possible - use baseline if enable_relaxation: _LOGGER.debug( "%sDay %s: Target reached with baseline - no relaxation needed", INDENT_L1, day, ) all_periods.extend(day_periods) days_meeting_requirement += 1 continue # === RELAXATION PATH (only when enabled AND needed) === _LOGGER.debug( "%sDay %s: Baseline insufficient - starting relaxation", INDENT_L1, day, ) relaxation_was_needed = True # Run full relaxation for this specific day day_relaxed_result, day_metadata = _relax_single_day( day_prices=day_prices, config=config, min_periods=min_periods, relaxation_step_pct=relaxation_step_pct, should_show_callback=should_show_callback, baseline_periods=day_periods, day_label=str(day), ) all_periods.extend(day_relaxed_result["periods"]) if day_metadata.get("phases_used"): all_phases_used.extend(day_metadata["phases_used"]) # Check if this day met the requirement after relaxation day_standalone = len([p for p in day_relaxed_result["periods"] if not p.get("is_extension")]) if day_standalone >= min_periods: days_meeting_requirement += 1 # Sort all periods by start time all_periods.sort(key=lambda p: p["start"]) # Recalculate metadata for combined periods _recalculate_period_metadata(all_periods) # Build combined result if all_periods: # Use the last day's result as template final_result = day_result.copy() final_result["periods"] = all_periods else: final_result = {"periods": [], "metadata": {}, "reference_data": {}} total_standalone = len([p for p in all_periods if not p.get("is_extension")]) return final_result, { "relaxation_active": relaxation_was_needed, "relaxation_attempted": relaxation_was_needed, "min_periods_requested": min_periods, "periods_found": total_standalone, "phases_used": list(set(all_phases_used)), # Unique phases used across all days "days_processed": total_days, "days_meeting_requirement": days_meeting_requirement, "relaxation_incomplete": days_meeting_requirement < total_days, } def _relax_single_day( # noqa: PLR0913 - Comprehensive filter relaxation per day day_prices: list[dict], config: PeriodConfig, min_periods: int, relaxation_step_pct: int, should_show_callback: Callable[[str | None, str | None], bool], baseline_periods: list[dict], day_label: str, ) -> tuple[dict[str, Any], dict[str, Any]]: """ Run comprehensive relaxation for a single day. NEW STRATEGY: For each flex level, try all filter combinations before increasing flex. This finds solutions faster by relaxing filters first (cheaper than increasing flex). Per flex level (6.25%, 7.5%, 8.75%, 10%), try in order: 1. Original filters (volatility=configured, level=configured) 2. Relax only volatility (volatility=any, level=configured) 3. Relax only level (volatility=configured, level=any) 4. Relax both (volatility=any, level=any) This ensures we find the minimal relaxation needed. Example: - If periods exist at flex=6.25% with level=any, we find them before trying flex=7.5% - If periods need both filters relaxed, we try that before increasing flex further Args: day_prices: Price data for this specific day only config: Base period configuration min_periods: Minimum periods needed for this day relaxation_step_pct: Relaxation increment percentage should_show_callback: Filter visibility callback(volatility_override, level_override) Returns True if periods should be shown with given overrides. baseline_periods: Periods found with normal filters day_label: Label for logging (e.g., "2025-11-11") Returns: Tuple of (periods_result, metadata) for this day """ accumulated_periods = baseline_periods.copy() original_flex = abs(config.flex) relaxation_increment = original_flex * (relaxation_step_pct / 100.0) phases_used = [] relaxed_result = None baseline_standalone = len([p for p in baseline_periods if not p.get("is_extension")]) # 4 flex levels: original + 3 steps (e.g., 5% → 6.25% → 7.5% → 8.75% → 10%) for flex_step in range(1, 5): new_flex = original_flex + (flex_step * relaxation_increment) new_flex = min(new_flex, 100.0) if config.reverse_sort: new_flex = -new_flex # Try filter combinations for this flex level # Each tuple contains: volatility_override, level_override, label_suffix filter_attempts = [ (None, None, ""), # Original config ("any", None, "+volatility_any"), # Relax volatility only (None, "any", "+level_any"), # Relax level only ("any", "any", "+all_filters_any"), # Relax both ] for vol_override, lvl_override, label_suffix in filter_attempts: # Check if this combination is allowed by user config if not should_show_callback(vol_override, lvl_override): continue # Calculate periods with this flex + filter combination # Apply level override if specified level_filter_value = lvl_override if lvl_override else config.level_filter # Log filter changes flex_pct = round(abs(new_flex) * 100, 1) if lvl_override: _LOGGER.debug( "%sDay %s flex=%.1f%%: OVERRIDING level_filter: %s → %s", INDENT_L2, day_label, flex_pct, config.level_filter or "None", lvl_override.upper(), ) relaxed_config = config._replace( flex=new_flex, level_filter=level_filter_value, ) relaxed_result = calculate_periods(day_prices, config=relaxed_config) new_periods = relaxed_result["periods"] # Build relaxation level label BEFORE marking periods relaxation_level = f"price_diff_{flex_pct}%{label_suffix}" phases_used.append(relaxation_level) # Mark NEW periods with their specific relaxation metadata BEFORE merging for period in new_periods: period["relaxation_active"] = True # Set the metadata immediately - this preserves which phase found this period _mark_periods_with_relaxation([period], relaxation_level, original_flex, abs(new_flex)) # Merge with accumulated periods merged, standalone_count = _resolve_period_overlaps( accumulated_periods, new_periods, config.min_period_length, baseline_periods ) total_standalone = standalone_count + baseline_standalone filters_label = label_suffix if label_suffix else "(original filters)" _LOGGER.debug( "%sDay %s flex=%.1f%% %s: found %d new periods, %d standalone total (%d baseline + %d new)", INDENT_L2, day_label, flex_pct, filters_label, len(new_periods), total_standalone, baseline_standalone, standalone_count, ) accumulated_periods = merged.copy() # ✅ EARLY EXIT: Check after EACH filter combination if total_standalone >= min_periods: _LOGGER.info( "Day %s: Success with flex=%.1f%% %s - found %d/%d periods (%d baseline + %d from relaxation)", day_label, flex_pct, filters_label, total_standalone, min_periods, baseline_standalone, standalone_count, ) _recalculate_period_metadata(merged) result = relaxed_result.copy() result["periods"] = merged return result, {"phases_used": phases_used} # ❌ Only reach here if ALL phases exhausted WITHOUT reaching min_periods final_standalone = len([p for p in accumulated_periods if not p.get("is_extension")]) new_standalone = final_standalone - baseline_standalone _LOGGER.warning( "Day %s: All relaxation phases exhausted WITHOUT reaching goal - " "found %d/%d standalone periods (%d baseline + %d from relaxation)", day_label, final_standalone, min_periods, baseline_standalone, new_standalone, ) _recalculate_period_metadata(accumulated_periods) if relaxed_result: result = relaxed_result.copy() else: result = {"periods": accumulated_periods, "metadata": {}, "reference_data": {}} result["periods"] = accumulated_periods return result, {"phases_used": phases_used} def _mark_periods_with_relaxation( periods: list[dict], relaxation_level: str, original_threshold: float, applied_threshold: float, ) -> None: """ Mark periods with relaxation information (mutates period dicts in-place). Uses consistent 'relaxation_*' prefix for all relaxation-related attributes. Args: periods: List of period dicts to mark relaxation_level: String describing the relaxation level original_threshold: Original flex threshold value (decimal, e.g., 0.19 for 19%) applied_threshold: Actually applied threshold value (decimal, e.g., 0.25 for 25%) """ for period in periods: period["relaxation_active"] = True period["relaxation_level"] = relaxation_level # Convert decimal to percentage for display (0.19 → 19.0) period["relaxation_threshold_original_%"] = round(original_threshold * 100, 1) period["relaxation_threshold_applied_%"] = round(applied_threshold * 100, 1) def _resolve_period_overlaps( # noqa: PLR0912, PLR0915, C901 - Complex overlap resolution with replacement and extension logic existing_periods: list[dict], new_relaxed_periods: list[dict], min_period_length: int, baseline_periods: list[dict] | None = None, ) -> tuple[list[dict], int]: """ Resolve overlaps between existing periods and newly found relaxed periods. Existing periods (baseline + previous relaxation phases) have priority and remain unchanged. Newly relaxed periods are adjusted to not overlap with existing periods. After splitting relaxed periods to avoid overlaps, each segment is validated against min_period_length. Segments shorter than this threshold are discarded. This function is called incrementally after each relaxation phase: - Phase 1: existing = accumulated, baseline = baseline - Phase 2: existing = accumulated, baseline = baseline - Phase 3: existing = accumulated, baseline = baseline Args: existing_periods: All previously found periods (baseline + earlier relaxation phases) new_relaxed_periods: Periods found in current relaxation phase (will be adjusted) min_period_length: Minimum period length in minutes (segments shorter than this are discarded) baseline_periods: Original baseline periods (for extension detection). Extensions only count against baseline, not against other relaxation periods. Returns: Tuple of (merged_periods, count_standalone_relaxed): - merged_periods: All periods (existing + adjusted new), sorted by start time - count_standalone_relaxed: Number of new relaxed periods that count toward min_periods (excludes extensions of baseline periods only) """ if baseline_periods is None: baseline_periods = existing_periods # Fallback to existing if not provided _LOGGER.debug( "%s_resolve_period_overlaps called: existing=%d, new=%d, baseline=%d", INDENT_L3, len(existing_periods), len(new_relaxed_periods), len(baseline_periods), ) if not new_relaxed_periods: return existing_periods.copy(), 0 if not existing_periods: # No overlaps possible - all relaxed periods are standalone return new_relaxed_periods.copy(), len(new_relaxed_periods) merged = existing_periods.copy() count_standalone = 0 for relaxed in new_relaxed_periods: # Skip if this exact period is already in existing_periods (duplicate from previous relaxation attempt) # Compare current start/end (before any splitting), not original_start/end # Note: original_start/end are set AFTER splitting and indicate split segments from same source relaxed_start = relaxed["start"] relaxed_end = relaxed["end"] is_duplicate = False for existing in existing_periods: # Only compare with existing periods that haven't been adjusted (unsplit originals) # If existing has original_start/end, it's already a split segment - skip comparison if "original_start" in existing: continue existing_start = existing["start"] existing_end = existing["end"] # Duplicate if same boundaries (within 1 minute tolerance) tolerance_seconds = 60 # 1 minute tolerance for duplicate detection if ( abs((relaxed_start - existing_start).total_seconds()) < tolerance_seconds and abs((relaxed_end - existing_end).total_seconds()) < tolerance_seconds ): is_duplicate = True _LOGGER.debug( "%sSkipping duplicate period %s-%s (already exists from previous relaxation)", INDENT_L4, relaxed_start.strftime("%H:%M"), relaxed_end.strftime("%H:%M"), ) break if is_duplicate: continue # Find all overlapping existing periods overlaps = [] for existing in existing_periods: existing_start = existing["start"] existing_end = existing["end"] # Check for overlap if relaxed_start < existing_end and relaxed_end > existing_start: overlaps.append((existing_start, existing_end)) if not overlaps: # No overlap - check if adjacent to baseline period (= extension) # Only baseline extensions don't count toward min_periods is_extension = False for baseline in baseline_periods: if relaxed_end == baseline["start"] or relaxed_start == baseline["end"]: is_extension = True break if is_extension: relaxed["is_extension"] = True _LOGGER.debug( "%sMarking period %s-%s as extension (no overlap, adjacent to baseline)", INDENT_L4, relaxed_start.strftime("%H:%M"), relaxed_end.strftime("%H:%M"), ) else: count_standalone += 1 merged.append(relaxed) else: # Has overlaps - check if this new period extends BASELINE periods # Extension = new period encompasses/extends baseline period(s) # Note: If new period encompasses OTHER RELAXED periods, that's a replacement, not extension! is_extension = False periods_to_replace = [] for existing in existing_periods: existing_start = existing["start"] existing_end = existing["end"] # Check if new period completely encompasses existing period if relaxed_start <= existing_start and relaxed_end >= existing_end: # Is this existing period a BASELINE period? is_baseline = any( bp["start"] == existing_start and bp["end"] == existing_end for bp in baseline_periods ) if is_baseline: # Extension of baseline → counts as extension is_extension = True _LOGGER.debug( "%sNew period %s-%s extends BASELINE period %s-%s", INDENT_L4, relaxed_start.strftime("%H:%M"), relaxed_end.strftime("%H:%M"), existing_start.strftime("%H:%M"), existing_end.strftime("%H:%M"), ) else: # Encompasses another relaxed period → REPLACEMENT, not extension periods_to_replace.append(existing) _LOGGER.debug( "%sNew period %s-%s replaces relaxed period %s-%s (larger is better)", INDENT_L4, relaxed_start.strftime("%H:%M"), relaxed_end.strftime("%H:%M"), existing_start.strftime("%H:%M"), existing_end.strftime("%H:%M"), ) # Remove periods that are being replaced by this larger period if periods_to_replace: for period_to_remove in periods_to_replace: if period_to_remove in merged: merged.remove(period_to_remove) _LOGGER.debug( "%sReplaced period %s-%s with larger period %s-%s", INDENT_L5, period_to_remove["start"].strftime("%H:%M"), period_to_remove["end"].strftime("%H:%M"), relaxed_start.strftime("%H:%M"), relaxed_end.strftime("%H:%M"), ) # Split the relaxed period into non-overlapping segments segments = _split_period_by_overlaps(relaxed_start, relaxed_end, overlaps) # If no segments (completely overlapped), but we replaced periods, add the full period if not segments and periods_to_replace: _LOGGER.debug( "%sAdding full replacement period %s-%s (no non-overlapping segments)", INDENT_L5, relaxed_start.strftime("%H:%M"), relaxed_end.strftime("%H:%M"), ) # Mark as extension if it extends baseline, otherwise standalone if is_extension: relaxed["is_extension"] = True merged.append(relaxed) continue for seg_start, seg_end in segments: # Calculate segment duration in minutes segment_duration_minutes = int((seg_end - seg_start).total_seconds() / 60) # Skip segment if it's too short if segment_duration_minutes < min_period_length: continue # Create adjusted period segment adjusted_period = relaxed.copy() adjusted_period["start"] = seg_start adjusted_period["end"] = seg_end adjusted_period["duration_minutes"] = segment_duration_minutes # Mark as adjusted and potentially as extension adjusted_period["adjusted_for_overlap"] = True adjusted_period["original_start"] = relaxed_start adjusted_period["original_end"] = relaxed_end # If the original period was an extension, all its segments are extensions too # OR if segment is adjacent to baseline segment_is_extension = is_extension if not segment_is_extension: # Check if segment is directly adjacent to BASELINE period for baseline in baseline_periods: if seg_end == baseline["start"] or seg_start == baseline["end"]: segment_is_extension = True break if segment_is_extension: adjusted_period["is_extension"] = True _LOGGER.debug( "%sMarking segment %s-%s as extension (original was extension or adjacent to baseline)", INDENT_L5, seg_start.strftime("%H:%M"), seg_end.strftime("%H:%M"), ) else: # Standalone segment counts toward min_periods count_standalone += 1 merged.append(adjusted_period) # Sort all periods by start time merged.sort(key=lambda p: p["start"]) # Count ACTUAL standalone periods in final merged list (not just newly added ones) # This accounts for replacements where old standalone was replaced by new standalone final_standalone_count = len([p for p in merged if not p.get("is_extension")]) # Subtract baseline standalone count to get NEW standalone from this relaxation baseline_standalone_count = len([p for p in baseline_periods if not p.get("is_extension")]) new_standalone_count = final_standalone_count - baseline_standalone_count return merged, new_standalone_count def _split_period_by_overlaps( period_start: datetime, period_end: datetime, overlaps: list[tuple[datetime, datetime]], ) -> list[tuple[datetime, datetime]]: """ Split a time period into segments that don't overlap with given ranges. Args: period_start: Start of period to split period_end: End of period to split overlaps: List of (start, end) tuples representing overlapping ranges Returns: List of (start, end) tuples for non-overlapping segments Example: period: 09:00-15:00 overlaps: [(10:00-12:00), (14:00-16:00)] result: [(09:00-10:00), (12:00-14:00)] """ # Sort overlaps by start time sorted_overlaps = sorted(overlaps, key=lambda x: x[0]) segments = [] current_pos = period_start for overlap_start, overlap_end in sorted_overlaps: # Add segment before this overlap (if any) if current_pos < overlap_start: segments.append((current_pos, overlap_start)) # Move position past this overlap current_pos = max(current_pos, overlap_end) # Add final segment after all overlaps (if any) if current_pos < period_end: segments.append((current_pos, period_end)) return segments