hass.tibber_prices/custom_components/tibber_prices/period_utils/period_merging.py
Julian Pawlowski 383b495545
Feature/adaptive defaults (#22)
* feat(period-calc): adaptive defaults + remove volatility filter

Major improvements to period calculation with smarter defaults and
simplified configuration:

**Adaptive Defaults:**
- ENABLE_MIN_PERIODS: true (was false) - Always try to find periods
- MIN_PERIODS target: 2 periods/day (ensures coverage)
- BEST_PRICE_MAX_LEVEL: "cheap" (was "any") - Prefer genuinely cheap
- PEAK_PRICE_MIN_LEVEL: "expensive" (was "any") - Prefer genuinely expensive
- GAP_TOLERANCE: 1 (was 0) - Allow 1-level deviations in sequences
- MIN_DISTANCE_FROM_AVG: 5% (was 2%) - Ensure significance
- PEAK_PRICE_MIN_PERIOD_LENGTH: 30min (was 60min) - More responsive
- PEAK_PRICE_FLEX: -20% (was -15%) - Better peak detection

**Volatility Filter Removal:**
- Removed CONF_BEST_PRICE_MIN_VOLATILITY from const.py
- Removed CONF_PEAK_PRICE_MIN_VOLATILITY from const.py
- Removed volatility filter UI controls from config_flow.py
- Removed filter_periods_by_volatility() calls from coordinator.py
- Updated all 5 translations (de, en, nb, nl, sv)

**Period Calculation Logic:**
- Level filter now integrated into _build_periods() (applied during
  interval qualification, not as post-filter)
- Gap tolerance implemented via _check_level_with_gap_tolerance()
- Short periods (<1.5h) use strict filtering (no gap tolerance)
- Relaxation now passes level_filter + gap_count directly to
  PeriodConfig
- show_periods check skipped when relaxation enabled (relaxation
  tries "any" as fallback)

**Documentation:**
- Complete rewrite of docs/user/period-calculation.md:
  * Visual examples with timelines
  * Step-by-step explanation of 4-step process
  * Configuration scenarios (5 common use cases)
  * Troubleshooting section with specific fixes
  * Advanced topics (per-day independence, early stop, etc.)
- Updated README.md: "volatility" → "distance from average"

Impact: Periods now reliably appear on most days with meaningful
quality filters. Users get warned about expensive periods and notified
about cheap opportunities without manual tuning. Relaxation ensures
coverage while keeping filters as strict as possible.

Breaking change: Volatility filter removed (was never a critical
feature, often confused users). Existing configs continue to work
(removed keys are simply ignored).

* feat(periods): modularize period_utils and add statistical outlier filtering

Refactored monolithic period_utils.py (1800 lines) into focused modules
for better maintainability and added advanced outlier filtering with
smart impact tracking.

Modular structure:
- types.py: Type definitions and constants (89 lines)
- level_filtering.py: Level filtering with gap tolerance (121 lines)
- period_building.py: Period construction from intervals (238 lines)
- period_statistics.py: Statistics and summaries (318 lines)
- period_merging.py: Overlap resolution (382 lines)
- relaxation.py: Per-day relaxation strategy (547 lines)
- core.py: Main API orchestration (251 lines)
- outlier_filtering.py: Statistical spike detection (294 lines)
- __init__.py: Public API exports (62 lines)

New statistical outlier filtering:
- Linear regression for trend-based spike detection
- 2 standard deviation confidence intervals (95%)
- Symmetry checking to preserve legitimate price shifts
- Enhanced zigzag detection with relative volatility (catches clusters)
- Replaces simple average smoothing with trend-based predictions

Smart impact tracking:
- Tests if original price would have passed criteria
- Only counts smoothed intervals that actually changed period formation
- Tracks level gap tolerance usage separately
- Both attributes only appear when > 0 (clean UI)

New period attributes:
- period_interval_smoothed_count: Intervals kept via outlier smoothing
- period_interval_level_gap_count: Intervals kept via gap tolerance

Impact: Statistical outlier filtering prevents isolated price spikes from
breaking continuous periods while preserving data integrity. All statistics
use original prices. Smart tracking shows only meaningful interventions,
making it clear when tolerance mechanisms actually influenced results.

Backwards compatible: All public APIs re-exported from period_utils package.

* Update docs/user/period-calculation.md

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update custom_components/tibber_prices/const.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update custom_components/tibber_prices/coordinator.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update custom_components/tibber_prices/const.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* docs(periods): fix corrupted period-calculation.md and add outlier filtering documentation

Completely rewrote period-calculation.md after severe corruption (massive text
duplication throughout the file made it 2489 lines).

Changes:
- Fixed formatting: Removed all duplicate text and headers
- Reduced file size: 2594 lines down to 516 lines (clean, readable structure)
- Added section 5: "Statistical Outlier Filtering (NEW)" explaining:
  - Linear regression-based spike detection (95% confidence intervals)
  - Symmetry checking to preserve legitimate price shifts
  - Enhanced zigzag detection with relative volatility
  - Data integrity guarantees (original prices always used)
  - New period attributes: period_interval_smoothed_count
- Added troubleshooting: "Price spikes breaking periods" section
- Added technical details: Algorithm constants and implementation notes

Impact: Users can now understand how outlier filtering prevents isolated
price spikes from breaking continuous periods. Documentation is readable
again with no duplicate content.

* fix(const): improve clarity in comments regarding period lengths for price alerts

* docs(periods): improve formatting and clarity in period-calculation.md

* Initial plan

* refactor: convert flexibility_pct to ratio once at function entry

Co-authored-by: jpawlowski <75446+jpawlowski@users.noreply.github.com>

* Update custom_components/tibber_prices/const.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update custom_components/tibber_prices/period_utils/period_building.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update custom_components/tibber_prices/period_utils/relaxation.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Julian Pawlowski <jpawlowski@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
2025-11-13 23:51:29 +01:00

382 lines
15 KiB
Python

"""Period merging and overlap resolution logic."""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from custom_components.tibber_prices.period_utils.types import MINUTES_PER_INTERVAL
from homeassistant.util import dt as dt_util
_LOGGER = logging.getLogger(__name__)
# Module-local log indentation (each module starts at level 0)
INDENT_L0 = "" # Entry point / main function
INDENT_L1 = " " # Nested logic / loop iterations
INDENT_L2 = " " # Deeper nesting
def merge_adjacent_periods_at_midnight(periods: list[list[dict]]) -> list[list[dict]]:
"""
Merge adjacent periods that meet at midnight.
When two periods are detected separately for consecutive days but are directly
adjacent at midnight (15 minutes apart), merge them into a single period.
"""
if not periods:
return periods
merged = []
i = 0
while i < len(periods):
current_period = periods[i]
# Check if there's a next period and if they meet at midnight
if i + 1 < len(periods):
next_period = periods[i + 1]
last_start = current_period[-1].get("interval_start")
next_start = next_period[0].get("interval_start")
if last_start and next_start:
time_diff = next_start - last_start
last_date = last_start.date()
next_date = next_start.date()
# If they are 15 minutes apart and on different days (crossing midnight)
if time_diff == timedelta(minutes=MINUTES_PER_INTERVAL) and next_date > last_date:
# Merge the two periods
merged_period = current_period + next_period
merged.append(merged_period)
i += 2 # Skip both periods as we've merged them
continue
# If no merge happened, just add the current period
merged.append(current_period)
i += 1
return merged
def recalculate_period_metadata(periods: list[dict]) -> None:
"""
Recalculate period metadata after merging periods.
Updates period_position, periods_total, and periods_remaining for all periods
based on chronological order.
This must be called after resolve_period_overlaps() to ensure metadata
reflects the final merged period list.
Args:
periods: List of period summary dicts (mutated in-place)
"""
if not periods:
return
# Sort periods chronologically by start time
periods.sort(key=lambda p: p.get("start") or dt_util.now())
# Update metadata for all periods
total_periods = len(periods)
for position, period in enumerate(periods, 1):
period["period_position"] = position
period["periods_total"] = total_periods
period["periods_remaining"] = total_periods - position
def split_period_by_overlaps(
period_start: datetime,
period_end: datetime,
overlaps: list[tuple[datetime, datetime]],
) -> list[tuple[datetime, datetime]]:
"""
Split a time period into segments that don't overlap with given ranges.
Args:
period_start: Start of period to split
period_end: End of period to split
overlaps: List of (start, end) tuples representing overlapping ranges
Returns:
List of (start, end) tuples for non-overlapping segments
Example:
period: 09:00-15:00
overlaps: [(10:00-12:00), (14:00-16:00)]
result: [(09:00-10:00), (12:00-14:00)]
"""
# Sort overlaps by start time
sorted_overlaps = sorted(overlaps, key=lambda x: x[0])
segments = []
current_pos = period_start
for overlap_start, overlap_end in sorted_overlaps:
# Add segment before this overlap (if any)
if current_pos < overlap_start:
segments.append((current_pos, overlap_start))
# Move position past this overlap
current_pos = max(current_pos, overlap_end)
# Add final segment after all overlaps (if any)
if current_pos < period_end:
segments.append((current_pos, period_end))
return segments
def resolve_period_overlaps( # noqa: PLR0912, PLR0915, C901 - Complex overlap resolution with replacement and extension logic
existing_periods: list[dict],
new_relaxed_periods: list[dict],
min_period_length: int,
baseline_periods: list[dict] | None = None,
) -> tuple[list[dict], int]:
"""
Resolve overlaps between existing periods and newly found relaxed periods.
Existing periods (baseline + previous relaxation phases) have priority and remain unchanged.
Newly relaxed periods are adjusted to not overlap with existing periods.
After splitting relaxed periods to avoid overlaps, each segment is validated against
min_period_length. Segments shorter than this threshold are discarded.
This function is called incrementally after each relaxation phase:
- Phase 1: existing = accumulated, baseline = baseline
- Phase 2: existing = accumulated, baseline = baseline
- Phase 3: existing = accumulated, baseline = baseline
Args:
existing_periods: All previously found periods (baseline + earlier relaxation phases)
new_relaxed_periods: Periods found in current relaxation phase (will be adjusted)
min_period_length: Minimum period length in minutes (segments shorter than this are discarded)
baseline_periods: Original baseline periods (for extension detection). Extensions only count
against baseline, not against other relaxation periods.
Returns:
Tuple of (merged_periods, count_standalone_relaxed):
- merged_periods: All periods (existing + adjusted new), sorted by start time
- count_standalone_relaxed: Number of new relaxed periods that count toward min_periods
(excludes extensions of baseline periods only)
"""
if baseline_periods is None:
baseline_periods = existing_periods # Fallback to existing if not provided
_LOGGER.debug(
"%sresolve_period_overlaps called: existing=%d, new=%d, baseline=%d",
INDENT_L0,
len(existing_periods),
len(new_relaxed_periods),
len(baseline_periods),
)
if not new_relaxed_periods:
return existing_periods.copy(), 0
if not existing_periods:
# No overlaps possible - all relaxed periods are standalone
return new_relaxed_periods.copy(), len(new_relaxed_periods)
merged = existing_periods.copy()
count_standalone = 0
for relaxed in new_relaxed_periods:
# Skip if this exact period is already in existing_periods (duplicate from previous relaxation attempt)
# Compare current start/end (before any splitting), not original_start/end
# Note: original_start/end are set AFTER splitting and indicate split segments from same source
relaxed_start = relaxed["start"]
relaxed_end = relaxed["end"]
is_duplicate = False
for existing in existing_periods:
# Only compare with existing periods that haven't been adjusted (unsplit originals)
# If existing has original_start/end, it's already a split segment - skip comparison
if "original_start" in existing:
continue
existing_start = existing["start"]
existing_end = existing["end"]
# Duplicate if same boundaries (within 1 minute tolerance)
tolerance_seconds = 60 # 1 minute tolerance for duplicate detection
if (
abs((relaxed_start - existing_start).total_seconds()) < tolerance_seconds
and abs((relaxed_end - existing_end).total_seconds()) < tolerance_seconds
):
is_duplicate = True
_LOGGER.debug(
"%sSkipping duplicate period %s-%s (already exists from previous relaxation)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
break
if is_duplicate:
continue
# Find all overlapping existing periods
overlaps = []
for existing in existing_periods:
existing_start = existing["start"]
existing_end = existing["end"]
# Check for overlap
if relaxed_start < existing_end and relaxed_end > existing_start:
overlaps.append((existing_start, existing_end))
if not overlaps:
# No overlap - check if adjacent to baseline period (= extension)
# Only baseline extensions don't count toward min_periods
is_extension = False
for baseline in baseline_periods:
if relaxed_end == baseline["start"] or relaxed_start == baseline["end"]:
is_extension = True
break
if is_extension:
relaxed["is_extension"] = True
_LOGGER.debug(
"%sMarking period %s-%s as extension (no overlap, adjacent to baseline)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
else:
count_standalone += 1
merged.append(relaxed)
else:
# Has overlaps - check if this new period extends BASELINE periods
# Extension = new period encompasses/extends baseline period(s)
# Note: If new period encompasses OTHER RELAXED periods, that's a replacement, not extension!
is_extension = False
periods_to_replace = []
for existing in existing_periods:
existing_start = existing["start"]
existing_end = existing["end"]
# Check if new period completely encompasses existing period
if relaxed_start <= existing_start and relaxed_end >= existing_end:
# Is this existing period a BASELINE period?
is_baseline = any(
bp["start"] == existing_start and bp["end"] == existing_end for bp in baseline_periods
)
if is_baseline:
# Extension of baseline → counts as extension
is_extension = True
_LOGGER.debug(
"%sNew period %s-%s extends BASELINE period %s-%s",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
existing_start.strftime("%H:%M"),
existing_end.strftime("%H:%M"),
)
else:
# Encompasses another relaxed period → REPLACEMENT, not extension
periods_to_replace.append(existing)
_LOGGER.debug(
"%sNew period %s-%s replaces relaxed period %s-%s (larger is better)",
INDENT_L1,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
existing_start.strftime("%H:%M"),
existing_end.strftime("%H:%M"),
)
# Remove periods that are being replaced by this larger period
if periods_to_replace:
for period_to_remove in periods_to_replace:
if period_to_remove in merged:
merged.remove(period_to_remove)
_LOGGER.debug(
"%sReplaced period %s-%s with larger period %s-%s",
INDENT_L2,
period_to_remove["start"].strftime("%H:%M"),
period_to_remove["end"].strftime("%H:%M"),
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
# Split the relaxed period into non-overlapping segments
segments = split_period_by_overlaps(relaxed_start, relaxed_end, overlaps)
# If no segments (completely overlapped), but we replaced periods, add the full period
if not segments and periods_to_replace:
_LOGGER.debug(
"%sAdding full replacement period %s-%s (no non-overlapping segments)",
INDENT_L2,
relaxed_start.strftime("%H:%M"),
relaxed_end.strftime("%H:%M"),
)
# Mark as extension if it extends baseline, otherwise standalone
if is_extension:
relaxed["is_extension"] = True
merged.append(relaxed)
continue
for seg_start, seg_end in segments:
# Calculate segment duration in minutes
segment_duration_minutes = int((seg_end - seg_start).total_seconds() / 60)
# Skip segment if it's too short
if segment_duration_minutes < min_period_length:
continue
# Create adjusted period segment
adjusted_period = relaxed.copy()
adjusted_period["start"] = seg_start
adjusted_period["end"] = seg_end
adjusted_period["duration_minutes"] = segment_duration_minutes
# Mark as adjusted and potentially as extension
adjusted_period["adjusted_for_overlap"] = True
adjusted_period["original_start"] = relaxed_start
adjusted_period["original_end"] = relaxed_end
# If the original period was an extension, all its segments are extensions too
# OR if segment is adjacent to baseline
segment_is_extension = is_extension
if not segment_is_extension:
# Check if segment is directly adjacent to BASELINE period
for baseline in baseline_periods:
if seg_end == baseline["start"] or seg_start == baseline["end"]:
segment_is_extension = True
break
if segment_is_extension:
adjusted_period["is_extension"] = True
_LOGGER.debug(
"%sMarking segment %s-%s as extension (original was extension or adjacent to baseline)",
INDENT_L2,
seg_start.strftime("%H:%M"),
seg_end.strftime("%H:%M"),
)
else:
# Standalone segment counts toward min_periods
count_standalone += 1
merged.append(adjusted_period)
# Sort all periods by start time
merged.sort(key=lambda p: p["start"])
# Count ACTUAL standalone periods in final merged list (not just newly added ones)
# This accounts for replacements where old standalone was replaced by new standalone
final_standalone_count = len([p for p in merged if not p.get("is_extension")])
# Subtract baseline standalone count to get NEW standalone from this relaxation
baseline_standalone_count = len([p for p in baseline_periods if not p.get("is_extension")])
new_standalone_count = final_standalone_count - baseline_standalone_count
return merged, new_standalone_count