hass.tibber_prices/custom_components/tibber_prices/period_utils/period_merging.py
Julian Pawlowski 23b8bd1c62 feat(periods): modularize period_utils and add statistical outlier filtering
Refactored monolithic period_utils.py (1800 lines) into focused modules
for better maintainability and added advanced outlier filtering with
smart impact tracking.

Modular structure:
- types.py: Type definitions and constants (89 lines)
- level_filtering.py: Level filtering with gap tolerance (121 lines)
- period_building.py: Period construction from intervals (238 lines)
- period_statistics.py: Statistics and summaries (318 lines)
- period_merging.py: Overlap resolution (382 lines)
- relaxation.py: Per-day relaxation strategy (547 lines)
- core.py: Main API orchestration (251 lines)
- outlier_filtering.py: Statistical spike detection (294 lines)
- __init__.py: Public API exports (62 lines)

New statistical outlier filtering:
- Linear regression for trend-based spike detection
- 2 standard deviation confidence intervals (95%)
- Symmetry checking to preserve legitimate price shifts
- Enhanced zigzag detection with relative volatility (catches clusters)
- Replaces simple average smoothing with trend-based predictions

Smart impact tracking:
- Tests if original price would have passed criteria
- Only counts smoothed intervals that actually changed period formation
- Tracks level gap tolerance usage separately
- Both attributes only appear when > 0 (clean UI)

New period attributes:
- period_interval_smoothed_count: Intervals kept via outlier smoothing
- period_interval_level_gap_count: Intervals kept via gap tolerance

Impact: Statistical outlier filtering prevents isolated price spikes from
breaking continuous periods while preserving data integrity. All statistics
use original prices. Smart tracking shows only meaningful interventions,
making it clear when tolerance mechanisms actually influenced results.

Backwards compatible: All public APIs re-exported from period_utils package.
2025-11-12 16:37:34 +00:00

382 lines
15 KiB
Python

"""Period merging and overlap resolution logic."""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from custom_components.tibber_prices.period_utils.types import MINUTES_PER_INTERVAL
from homeassistant.util import dt as dt_util
_LOGGER = logging.getLogger(__name__)
# Module-local log indentation (each module starts at level 0)
INDENT_L0 = "" # Entry point / main function
INDENT_L1 = " " # Nested logic / loop iterations
INDENT_L2 = " " # Deeper nesting
def merge_adjacent_periods_at_midnight(periods: list[list[dict]]) -> list[list[dict]]:
"""
Merge adjacent periods that meet at midnight.
When two periods are detected separately for consecutive days but are directly
adjacent at midnight (15 minutes apart), merge them into a single period.
"""
if not periods:
return periods
merged = []
i = 0
while i < len(periods):
current_period = periods[i]
# Check if there's a next period and if they meet at midnight
if i + 1 < len(periods):
next_period = periods[i + 1]
last_start = current_period[-1].get("interval_start")
next_start = next_period[0].get("interval_start")
if last_start and next_start:
time_diff = next_start - last_start
last_date = last_start.date()
next_date = next_start.date()
# If they are 15 minutes apart and on different days (crossing midnight)
if time_diff == timedelta(minutes=MINUTES_PER_INTERVAL) and next_date > last_date:
# Merge the two periods
merged_period = current_period + next_period
merged.append(merged_period)
i += 2 # Skip both periods as we've merged them
continue
# If no merge happened, just add the current period
merged.append(current_period)
i += 1
return merged
def recalculate_period_metadata(periods: list[dict]) -> None:
"""
Recalculate period metadata after merging periods.
Updates period_position, periods_total, and periods_remaining for all periods
based on chronological order.
This must be called after resolve_period_overlaps() to ensure metadata
reflects the final merged period list.
Args:
periods: List of period summary dicts (mutated in-place)
"""
if not periods:
return
# Sort periods chronologically by start time
periods.sort(key=lambda p: p.get("start") or dt_util.now())
# Update metadata for all periods
total_periods = len(periods)
for position, period in enumerate(periods, 1):
period["period_position"] = position
period["periods_total"] = total_periods
period["periods_remaining"] = total_periods - position
def split_period_by_overlaps(
period_start: datetime,
period_end: datetime,
overlaps: list[tuple[datetime, datetime]],
) -> list[tuple[datetime, datetime]]:
"""
Split a time period into segments that don't overlap with given ranges.
Args:
period_start: Start of period to split
period_end: End of period to split
overlaps: List of (start, end) tuples representing overlapping ranges
Returns:
List of (start, end) tuples for non-overlapping segments
Example:
period: 09:00-15:00
overlaps: [(10:00-12:00), (14:00-16:00)]
result: [(09:00-10:00), (12:00-14:00)]
"""
# Sort overlaps by start time
sorted_overlaps = sorted(overlaps, key=lambda x: x[0])
segments = []
current_pos = period_start
for overlap_start, overlap_end in sorted_overlaps:
# Add segment before this overlap (if any)
if current_pos < overlap_start:
segments.append((current_pos, overlap_start))
# Move position past this overlap
current_pos = max(current_pos, overlap_end)
# Add final segment after all overlaps (if any)
if current_pos < period_end:
segments.append((current_pos, period_end))
return segments
def resolve_period_overlaps( # noqa: PLR0912, PLR0915, C901 - Complex overlap resolution with replacement and extension logic
existing_periods: list[dict],
new_relaxed_periods: list[dict],
min_period_length: int,
baseline_periods: list[dict] | None = None,
) -> tuple[list[dict], int]:
"""
Resolve overlaps between existing periods and newly found relaxed periods.
Existing periods (baseline + previous relaxation phases) have priority and remain unchanged.
Newly relaxed periods are adjusted to not overlap with existing periods.
After splitting relaxed periods to avoid overlaps, each segment is validated against
min_period_length. Segments shorter than this threshold are discarded.
This function is called incrementally after each relaxation phase:
- Phase 1: existing = accumulated, baseline = baseline
- Phase 2: existing = accumulated, baseline = baseline
- Phase 3: existing = accumulated, baseline = baseline
Args:
existing_periods: All previously found periods (baseline + earlier relaxation phases)
new_relaxed_periods: Periods found in current relaxation phase (will be adjusted)
min_period_length: Minimum period length in minutes (segments shorter than this are discarded)
baseline_periods: Original baseline periods (for extension detection). Extensions only count
against baseline, not against other relaxation periods.
Returns:
Tuple of (merged_periods, count_standalone_relaxed):
- merged_periods: All periods (existing + adjusted new), sorted by start time
- count_standalone_relaxed: Number of new relaxed periods that count toward min_periods
(excludes extensions of baseline periods only)
"""
if baseline_periods is None:
baseline_periods = existing_periods # Fallback to existing if not provided
_LOGGER.debug(
"%sresolve_period_overlaps called: existing=%d, new=%d, baseline=%d",
INDENT_L0,
len(existing_periods),
len(new_relaxed_periods),
len(baseline_periods),
)
if not new_relaxed_periods:
return existing_periods.copy(), 0
if not existing_periods:
# No overlaps possible - all relaxed periods are standalone
return new_relaxed_periods.copy(), len(new_relaxed_periods)
merged = existing_periods.copy()
count_standalone = 0
for relaxed in new_relaxed_periods:
# Skip if this exact period is already in existing_periods (duplicate from previous relaxation attempt)
# Compare current start/end (before any splitting), not original_start/end
# Note: original_start/end are set AFTER splitting and indicate split segments from same source
relaxed_start = relaxed["start"]
relaxed_end = relaxed["end"]
is_duplicate = False
for existing in existing_periods:
# Only compare with existing periods that haven't been adjusted (unsplit originals)
# If existing has original_start/end, it's already a split segment - skip comparison
if "original_start" in existing:
continue
existing_start = existing["start"]
existing_end = existing["end"]
# Duplicate if same boundaries (within 1 minute tolerance)
tolerance_seconds = 60 # 1 minute tolerance for duplicate detection
if (
abs((relaxed_start - existing_start).total_seconds()) < tolerance_seconds
and abs((relaxed_end - existing_end).total_seconds()) < tolerance_seconds
):
is_duplicate = True
_LOGGER.debug(
"%sSkipping duplicate period %s-%s (already exists from previous relaxation)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
break
if is_duplicate:
continue
# Find all overlapping existing periods
overlaps = []
for existing in existing_periods:
existing_start = existing["start"]
existing_end = existing["end"]
# Check for overlap
if relaxed_start < existing_end and relaxed_end > existing_start:
overlaps.append((existing_start, existing_end))
if not overlaps:
# No overlap - check if adjacent to baseline period (= extension)
# Only baseline extensions don't count toward min_periods
is_extension = False
for baseline in baseline_periods:
if relaxed_end == baseline["start"] or relaxed_start == baseline["end"]:
is_extension = True
break
if is_extension:
relaxed["is_extension"] = True
_LOGGER.debug(
"%sMarking period %s-%s as extension (no overlap, adjacent to baseline)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
else:
count_standalone += 1
merged.append(relaxed)
else:
# Has overlaps - check if this new period extends BASELINE periods
# Extension = new period encompasses/extends baseline period(s)
# Note: If new period encompasses OTHER RELAXED periods, that's a replacement, not extension!
is_extension = False
periods_to_replace = []
for existing in existing_periods:
existing_start = existing["start"]
existing_end = existing["end"]
# Check if new period completely encompasses existing period
if relaxed_start <= existing_start and relaxed_end >= existing_end:
# Is this existing period a BASELINE period?
is_baseline = any(
bp["start"] == existing_start and bp["end"] == existing_end for bp in baseline_periods
)
if is_baseline:
# Extension of baseline → counts as extension
is_extension = True
_LOGGER.debug(
"%sNew period %s-%s extends BASELINE period %s-%s",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
existing_start.strftime("%H:%M"),
existing_end.strftime("%H:%M"),
)
else:
# Encompasses another relaxed period → REPLACEMENT, not extension
periods_to_replace.append(existing)
_LOGGER.debug(
"%sNew period %s-%s replaces relaxed period %s-%s (larger is better)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
existing_start.strftime("%H:%M"),
existing_end.strftime("%H:%M"),
)
# Remove periods that are being replaced by this larger period
if periods_to_replace:
for period_to_remove in periods_to_replace:
if period_to_remove in merged:
merged.remove(period_to_remove)
_LOGGER.debug(
"%sReplaced period %s-%s with larger period %s-%s",
INDENT_L2,
period_to_remove["start"].strftime("%H:%M"),
period_to_remove["end"].strftime("%H:%M"),
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
# Split the relaxed period into non-overlapping segments
segments = split_period_by_overlaps(relaxed_start, relaxed_end, overlaps)
# If no segments (completely overlapped), but we replaced periods, add the full period
if not segments and periods_to_replace:
_LOGGER.debug(
"%sAdding full replacement period %s-%s (no non-overlapping segments)",
INDENT_L2,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
# Mark as extension if it extends baseline, otherwise standalone
if is_extension:
relaxed["is_extension"] = True
merged.append(relaxed)
continue
for seg_start, seg_end in segments:
# Calculate segment duration in minutes
segment_duration_minutes = int((seg_end - seg_start).total_seconds() / 60)
# Skip segment if it's too short
if segment_duration_minutes < min_period_length:
continue
# Create adjusted period segment
adjusted_period = relaxed.copy()
adjusted_period["start"] = seg_start
adjusted_period["end"] = seg_end
adjusted_period["duration_minutes"] = segment_duration_minutes
# Mark as adjusted and potentially as extension
adjusted_period["adjusted_for_overlap"] = True
adjusted_period["original_start"] = relaxed_start
adjusted_period["original_end"] = relaxed_end
# If the original period was an extension, all its segments are extensions too
# OR if segment is adjacent to baseline
segment_is_extension = is_extension
if not segment_is_extension:
# Check if segment is directly adjacent to BASELINE period
for baseline in baseline_periods:
if seg_end == baseline["start"] or seg_start == baseline["end"]:
segment_is_extension = True
break
if segment_is_extension:
adjusted_period["is_extension"] = True
_LOGGER.debug(
"%sMarking segment %s-%s as extension (original was extension or adjacent to baseline)",
INDENT_L2,
seg_start.strftime("%H:%M"),
seg_end.strftime("%H:%M"),
)
else:
# Standalone segment counts toward min_periods
count_standalone += 1
merged.append(adjusted_period)
# Sort all periods by start time
merged.sort(key=lambda p: p["start"])
# Count ACTUAL standalone periods in final merged list (not just newly added ones)
# This accounts for replacements where old standalone was replaced by new standalone
final_standalone_count = len([p for p in merged if not p.get("is_extension")])
# Subtract baseline standalone count to get NEW standalone from this relaxation
baseline_standalone_count = len([p for p in baseline_periods if not p.get("is_extension")])
new_standalone_count = final_standalone_count - baseline_standalone_count
return merged, new_standalone_count