mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-05-28 18:43:40 +00:00
Implement a new service that progressively relaxes user-defined filters to ensure a result is always returned when price data is available. This includes three phases: halving the minimum distance from average, expanding level filters, and reducing duration. Impact: Users will receive results even when strict filters would otherwise yield no matches, improving the reliability of scheduling actions. feat(pricing): enhance scheduling actions with new parameters Introduce new parameters `smooth_outliers`, `min_distance_from_avg`, and `allow_relaxation` to scheduling actions, allowing for better control over price selection and ensuring results are meaningfully different from average prices. Impact: Users can now fine-tune their scheduling actions to avoid marginal savings and ensure more uniform pricing within selected windows. docs(scheduling): update documentation for new features Revise the scheduling actions documentation to include new parameters and their effects, such as outlier smoothing and minimum distance from average, along with examples for better user understanding. Impact: Users will have clearer guidance on how to utilize new features effectively in their automations. test(scheduling): add tests for new relaxation logic Implement unit tests to verify the behavior of the new relaxation logic in scheduling actions, ensuring that filters are correctly relaxed and results are returned as expected. Impact: Increased test coverage and reliability of the scheduling features.
351 lines
12 KiB
Python
351 lines
12 KiB
Python
"""
|
|
Pure algorithms for finding cheapest price windows.
|
|
|
|
Two independent algorithms:
|
|
1. find_cheapest_contiguous_window — Sliding window for appliance scheduling
|
|
2. find_cheapest_n_intervals — Cheapest N picks for flexible-load scheduling
|
|
|
|
These are stateless pure functions with no Home Assistant dependencies.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
import statistics
|
|
from typing import Any
|
|
|
|
from custom_components.tibber_prices.utils.price import calculate_coefficient_of_variation
|
|
|
|
|
|
def find_cheapest_contiguous_window(
|
|
intervals: list[dict[str, Any]],
|
|
duration_intervals: int,
|
|
*,
|
|
reverse: bool = False,
|
|
) -> dict[str, Any] | None:
|
|
"""
|
|
Find the cheapest (or most expensive) contiguous window of exactly N intervals.
|
|
|
|
Uses a sliding window algorithm (O(n)) to find the window with the
|
|
lowest (or highest) average price.
|
|
|
|
Args:
|
|
intervals: Sorted list of price interval dicts with 'startsAt' and 'total' keys.
|
|
Must be pre-sorted by startsAt in ascending order.
|
|
duration_intervals: Number of consecutive intervals required.
|
|
reverse: If True, find the most expensive window instead of cheapest.
|
|
|
|
Returns:
|
|
Dict with window details (start, end, intervals, statistics),
|
|
or None if not enough intervals available.
|
|
|
|
"""
|
|
n = len(intervals)
|
|
if n == 0 or duration_intervals <= 0 or n < duration_intervals:
|
|
return None
|
|
|
|
# Calculate initial window sum
|
|
window_sum = sum(intervals[i]["total"] for i in range(duration_intervals))
|
|
best_sum = window_sum
|
|
best_start = 0
|
|
|
|
# Slide the window
|
|
for i in range(1, n - duration_intervals + 1):
|
|
window_sum += intervals[i + duration_intervals - 1]["total"]
|
|
window_sum -= intervals[i - 1]["total"]
|
|
if (window_sum > best_sum) if reverse else (window_sum < best_sum):
|
|
best_sum = window_sum
|
|
best_start = i
|
|
|
|
best_intervals = intervals[best_start : best_start + duration_intervals]
|
|
return {
|
|
"start": best_intervals[0]["startsAt"],
|
|
"end_interval_start": best_intervals[-1]["startsAt"],
|
|
"intervals": best_intervals,
|
|
}
|
|
|
|
|
|
def find_cheapest_n_intervals(
|
|
intervals: list[dict[str, Any]],
|
|
count: int,
|
|
min_segment_intervals: int = 1,
|
|
*,
|
|
reverse: bool = False,
|
|
) -> dict[str, Any] | None:
|
|
"""
|
|
Find the cheapest (or most expensive) N intervals, not necessarily contiguous.
|
|
|
|
Picks the cheapest (or most expensive) intervals by price, then groups them
|
|
into contiguous segments. If min_segment_intervals > 1, short segments are
|
|
discarded and replaced with next-cheapest/most-expensive available intervals
|
|
until all segments meet the minimum length.
|
|
|
|
Args:
|
|
intervals: Sorted list of price interval dicts with 'startsAt' and 'total' keys.
|
|
Must be pre-sorted by startsAt in ascending order.
|
|
count: Number of intervals to select.
|
|
min_segment_intervals: Minimum contiguous length for each segment.
|
|
Default 1 means no constraint.
|
|
reverse: If True, find the most expensive intervals instead of cheapest.
|
|
|
|
Returns:
|
|
Dict with schedule details (segments, intervals, statistics),
|
|
or None if not enough intervals available.
|
|
|
|
"""
|
|
n = len(intervals)
|
|
if n == 0 or count <= 0 or n < count:
|
|
return None
|
|
|
|
if min_segment_intervals <= 1:
|
|
# Simple case: pick cheapest/most expensive N, then sort chronologically
|
|
indexed = [(i, iv) for i, iv in enumerate(intervals)]
|
|
indexed.sort(key=lambda x: x[1]["total"], reverse=reverse)
|
|
selected_indices = sorted(idx for idx, _ in indexed[:count])
|
|
selected = [intervals[i] for i in selected_indices]
|
|
segments = group_intervals_into_segments(selected)
|
|
return {
|
|
"intervals": selected,
|
|
"segments": segments,
|
|
}
|
|
|
|
# Complex case: enforce minimum segment length
|
|
return _find_with_min_segment(intervals, count, min_segment_intervals, reverse=reverse)
|
|
|
|
|
|
def _find_with_min_segment(
|
|
intervals: list[dict[str, Any]],
|
|
count: int,
|
|
min_segment: int,
|
|
*,
|
|
reverse: bool = False,
|
|
) -> dict[str, Any] | None:
|
|
"""
|
|
Find cheapest/most expensive N intervals with minimum segment length constraint.
|
|
|
|
Iteratively picks intervals, discards segments that are too
|
|
short, and replaces them with next-best alternatives.
|
|
|
|
Converges in at most `count` iterations (worst case: every replacement
|
|
creates a new short segment that gets discarded).
|
|
"""
|
|
n = len(intervals)
|
|
|
|
# Build index lookup: interval original index → position
|
|
# Price-sorted indices for picking cheapest/most expensive available
|
|
price_order = sorted(range(n), key=lambda i: intervals[i]["total"], reverse=reverse)
|
|
|
|
selected: set[int] = set()
|
|
excluded: set[int] = set()
|
|
|
|
# Initial pick: cheapest 'count' intervals
|
|
picked = 0
|
|
for idx in price_order:
|
|
if picked >= count:
|
|
break
|
|
if idx not in excluded:
|
|
selected.add(idx)
|
|
picked += 1
|
|
|
|
if len(selected) < count:
|
|
return None
|
|
|
|
# Iterative refinement: discard short segments, replace with next-cheapest
|
|
max_iterations = count + 1 # Safety bound
|
|
for _ in range(max_iterations):
|
|
sorted_selected = sorted(selected)
|
|
segments = _group_indices_into_segments(sorted_selected)
|
|
|
|
short_segments = [seg for seg in segments if len(seg) < min_segment]
|
|
if not short_segments:
|
|
break # All segments meet minimum length
|
|
|
|
# Exclude all indices in short segments
|
|
for seg in short_segments:
|
|
for idx in seg:
|
|
selected.discard(idx)
|
|
excluded.add(idx)
|
|
|
|
# Refill from price order
|
|
needed = count - len(selected)
|
|
for idx in price_order:
|
|
if needed <= 0:
|
|
break
|
|
if idx not in selected and idx not in excluded:
|
|
selected.add(idx)
|
|
needed -= 1
|
|
|
|
if len(selected) < count:
|
|
# Not enough intervals available after exclusions
|
|
# Return best effort with what we have
|
|
break
|
|
|
|
sorted_selected = sorted(selected)
|
|
result_intervals = [intervals[i] for i in sorted_selected]
|
|
segments = group_intervals_into_segments(result_intervals)
|
|
|
|
return {
|
|
"intervals": result_intervals,
|
|
"segments": segments,
|
|
}
|
|
|
|
|
|
def _group_indices_into_segments(indices: list[int]) -> list[list[int]]:
|
|
"""Group sorted integer indices into contiguous runs."""
|
|
if not indices:
|
|
return []
|
|
|
|
segments: list[list[int]] = [[indices[0]]]
|
|
for i in range(1, len(indices)):
|
|
if indices[i] == indices[i - 1] + 1:
|
|
segments[-1].append(indices[i])
|
|
else:
|
|
segments.append([indices[i]])
|
|
return segments
|
|
|
|
|
|
def group_intervals_into_segments(
|
|
intervals: list[dict[str, Any]],
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
Group chronologically sorted intervals into contiguous segments.
|
|
|
|
Two intervals are contiguous if the second starts exactly 15 minutes
|
|
after the first.
|
|
|
|
Args:
|
|
intervals: Chronologically sorted interval dicts with 'startsAt' key.
|
|
|
|
Returns:
|
|
List of segment dicts, each containing:
|
|
- start: ISO timestamp of first interval
|
|
- end_interval_start: ISO timestamp of last interval in segment
|
|
- duration_minutes: Total segment duration
|
|
- interval_count: Number of intervals in segment
|
|
- intervals: The interval dicts in this segment
|
|
|
|
"""
|
|
if not intervals:
|
|
return []
|
|
|
|
segments: list[dict[str, Any]] = []
|
|
current_segment: list[dict[str, Any]] = [intervals[0]]
|
|
|
|
for i in range(1, len(intervals)):
|
|
prev_start = _parse_timestamp(intervals[i - 1]["startsAt"])
|
|
curr_start = _parse_timestamp(intervals[i]["startsAt"])
|
|
|
|
if curr_start - prev_start == timedelta(minutes=15):
|
|
current_segment.append(intervals[i])
|
|
else:
|
|
segments.append(_build_segment(current_segment))
|
|
current_segment = [intervals[i]]
|
|
|
|
segments.append(_build_segment(current_segment))
|
|
return segments
|
|
|
|
|
|
def _build_segment(intervals: list[dict[str, Any]]) -> dict[str, Any]:
|
|
"""Build a segment dict from a list of contiguous intervals."""
|
|
return {
|
|
"start": intervals[0]["startsAt"],
|
|
"end_interval_start": intervals[-1]["startsAt"],
|
|
"duration_minutes": len(intervals) * 15,
|
|
"interval_count": len(intervals),
|
|
"intervals": intervals,
|
|
}
|
|
|
|
|
|
def calculate_window_statistics(
|
|
intervals: list[dict[str, Any]],
|
|
unit_factor: int = 1,
|
|
round_decimals: int = 4,
|
|
interval_minutes: int = 15,
|
|
power_profile: list[int] | None = None,
|
|
) -> dict[str, float | None]:
|
|
"""
|
|
Calculate price statistics for a list of intervals.
|
|
|
|
Args:
|
|
intervals: List of interval dicts with 'total' key (base currency).
|
|
unit_factor: Multiplication factor for display (100 for subunit, 1 for base).
|
|
round_decimals: Number of decimal places for rounding.
|
|
interval_minutes: Duration of each interval in minutes (for cost calculation).
|
|
power_profile: Optional list of power values in watts, one per interval.
|
|
If shorter than the interval list, the last value is repeated.
|
|
When provided, estimated_total_cost reflects variable power draw instead
|
|
of a constant 1 kW load, and estimated_load_kwh is added to the result.
|
|
|
|
Returns:
|
|
Dict with price_mean, price_median, price_min, price_max, price_spread,
|
|
and estimated_total_cost (total cost for the given or constant 1 kW load).
|
|
When power_profile is provided, also includes estimated_load_kwh.
|
|
|
|
"""
|
|
if not intervals:
|
|
result: dict[str, float | None] = {
|
|
"price_mean": None,
|
|
"price_median": None,
|
|
"price_min": None,
|
|
"price_max": None,
|
|
"price_spread": None,
|
|
"coefficient_of_variation": None,
|
|
"estimated_total_cost": None,
|
|
}
|
|
if power_profile is not None:
|
|
result["estimated_load_kwh"] = None
|
|
return result
|
|
|
|
prices = [iv["total"] * unit_factor for iv in intervals]
|
|
mean = round(statistics.mean(prices), round_decimals)
|
|
median = round(statistics.median(prices), round_decimals)
|
|
price_min = round(min(prices), round_decimals)
|
|
price_max = round(max(prices), round_decimals)
|
|
spread = round(price_max - price_min, round_decimals)
|
|
|
|
hours_per_interval = interval_minutes / 60
|
|
|
|
# Calculate coefficient of variation (CV) as quality indicator
|
|
cv = calculate_coefficient_of_variation(prices)
|
|
cv_rounded = round(cv, round_decimals) if cv is not None else None
|
|
|
|
if power_profile is not None:
|
|
# Extend profile to cover all intervals by repeating the last value
|
|
last_watts = power_profile[-1] if power_profile else 1000
|
|
profile = list(power_profile) + [last_watts] * max(0, len(intervals) - len(power_profile))
|
|
load_kwh_per_interval = [w / 1000 * hours_per_interval for w in profile[: len(intervals)]]
|
|
estimated_cost = round(
|
|
sum(p * kwh for p, kwh in zip(prices, load_kwh_per_interval, strict=False)), round_decimals
|
|
)
|
|
estimated_load_kwh = round(sum(load_kwh_per_interval), round_decimals)
|
|
return {
|
|
"price_mean": mean,
|
|
"price_median": median,
|
|
"price_min": price_min,
|
|
"price_max": price_max,
|
|
"price_spread": spread,
|
|
"coefficient_of_variation": cv_rounded,
|
|
"estimated_total_cost": estimated_cost,
|
|
"estimated_load_kwh": estimated_load_kwh,
|
|
}
|
|
|
|
# Estimated cost for running a 1 kW constant load during all intervals
|
|
# Each interval covers interval_minutes/60 hours, price is per kWh
|
|
estimated_cost = round(sum(p * hours_per_interval for p in prices), round_decimals)
|
|
|
|
return {
|
|
"price_mean": mean,
|
|
"price_median": median,
|
|
"price_min": price_min,
|
|
"price_max": price_max,
|
|
"price_spread": spread,
|
|
"coefficient_of_variation": cv_rounded,
|
|
"estimated_total_cost": estimated_cost,
|
|
}
|
|
|
|
|
|
def _parse_timestamp(ts: str | datetime) -> datetime:
|
|
"""Parse an ISO timestamp string or pass through a datetime object."""
|
|
if isinstance(ts, datetime):
|
|
return ts
|
|
return datetime.fromisoformat(ts)
|