mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-05-28 18:43:40 +00:00
Implement a new service that progressively relaxes user-defined filters to ensure a result is always returned when price data is available. This includes three phases: halving the minimum distance from average, expanding level filters, and reducing duration. Impact: Users will receive results even when strict filters would otherwise yield no matches, improving the reliability of scheduling actions. feat(pricing): enhance scheduling actions with new parameters Introduce new parameters `smooth_outliers`, `min_distance_from_avg`, and `allow_relaxation` to scheduling actions, allowing for better control over price selection and ensuring results are meaningfully different from average prices. Impact: Users can now fine-tune their scheduling actions to avoid marginal savings and ensure more uniform pricing within selected windows. docs(scheduling): update documentation for new features Revise the scheduling actions documentation to include new parameters and their effects, such as outlier smoothing and minimum distance from average, along with examples for better user understanding. Impact: Users will have clearer guidance on how to utilize new features effectively in their automations. test(scheduling): add tests for new relaxation logic Implement unit tests to verify the behavior of the new relaxation logic in scheduling actions, ensuring that filters are correctly relaxed and results are returned as expected. Impact: Increased test coverage and reliability of the scheduling features.
661 lines
23 KiB
Python
661 lines
23 KiB
Python
"""
|
|
Shared utilities for service handlers.
|
|
|
|
This module provides common helper functions used across multiple service handlers,
|
|
such as entry validation, data extraction, timezone resolution, and search range handling.
|
|
|
|
Functions:
|
|
get_entry_and_data: Validate config entry and extract coordinator data
|
|
has_tomorrow_data: Check if tomorrow's price data is available
|
|
resolve_home_timezone: Extract home timezone from coordinator
|
|
localize_to_home_tz: Localize datetime to Tibber home timezone
|
|
calculate_end_of_tomorrow: Calculate end of tomorrow in home timezone
|
|
floor_to_quarter_hour: Floor datetime to quarter-hour boundary
|
|
apply_must_finish_by: Convert must_finish_by deadline to search_end
|
|
resolve_search_range: Resolve search start/end from various input formats
|
|
filter_intervals_by_price_level: Filter intervals by Tibber price level
|
|
VALID_SEARCH_SCOPES: Set of valid search_scope shorthand values
|
|
PRICE_LEVEL_ORDER: Ordered tuple of price levels (lowest to highest)
|
|
|
|
Used by:
|
|
- services/chartdata.py: Chart data export service
|
|
- services/apexcharts.py: ApexCharts YAML generation
|
|
- services/refresh_user_data.py: User data refresh
|
|
- services/find_cheapest_block.py: Block service (cheapest + most expensive)
|
|
- services/find_cheapest_hours.py: Hours service (cheapest + most expensive)
|
|
- services/find_most_expensive_block.py: Most expensive block wrapper
|
|
- services/find_most_expensive_hours.py: Most expensive hours wrapper
|
|
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, time as dt_time, timedelta
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from custom_components.tibber_prices.const import DOMAIN
|
|
from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets
|
|
from homeassistant.exceptions import ServiceValidationError
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
if TYPE_CHECKING:
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
|
|
from homeassistant.core import HomeAssistant
|
|
|
|
# Interval duration in minutes (quarter-hourly resolution)
|
|
INTERVAL_MINUTES = 15
|
|
|
|
# Default flexibility percentage for outlier smoothing in services
|
|
# Matches the period system default (15%) for consistency
|
|
DEFAULT_SERVICE_SMOOTHING_FLEX = 15.0
|
|
|
|
# Valid scopes for the search_scope shorthand parameter
|
|
VALID_SEARCH_SCOPES = frozenset({"today", "tomorrow", "remaining_today", "next_24h", "next_48h"})
|
|
|
|
# Price level hierarchy (lowest to highest)
|
|
PRICE_LEVEL_ORDER = ("VERY_CHEAP", "CHEAP", "NORMAL", "EXPENSIVE", "VERY_EXPENSIVE")
|
|
_PRICE_LEVEL_RANK: dict[str, int] = {lvl: i for i, lvl in enumerate(PRICE_LEVEL_ORDER)}
|
|
|
|
|
|
# Parameters that define explicit search range boundaries
|
|
_EXPLICIT_RANGE_PARAMS = frozenset(
|
|
{
|
|
"search_start",
|
|
"search_end",
|
|
"search_start_time",
|
|
"search_end_time",
|
|
"search_start_offset_minutes",
|
|
"search_end_offset_minutes",
|
|
"search_start_day_offset",
|
|
"search_end_day_offset",
|
|
"must_finish_by",
|
|
}
|
|
)
|
|
|
|
|
|
def validate_search_params(call_data: dict[str, Any]) -> None:
|
|
"""
|
|
Validate search range parameter combinations.
|
|
|
|
Checks for mutually exclusive parameters and required co-dependencies.
|
|
Must be called before resolve_search_range().
|
|
|
|
Raises:
|
|
ServiceValidationError: If parameter combinations are invalid
|
|
|
|
"""
|
|
has_scope = "search_scope" in call_data
|
|
|
|
# search_scope conflicts with all explicit range parameters
|
|
if has_scope:
|
|
# day_offset params always appear (voluptuous defaults to 0), exclude from conflict check
|
|
conflicts = _EXPLICIT_RANGE_PARAMS - {"search_start_day_offset", "search_end_day_offset"}
|
|
conflicting = [p for p in conflicts if p in call_data]
|
|
if conflicting:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="scope_conflicts_with_range",
|
|
translation_placeholders={"params": ", ".join(sorted(conflicting))},
|
|
)
|
|
|
|
# must_finish_by conflicts with all end-boundary parameters
|
|
if "must_finish_by" in call_data:
|
|
end_conflicts = {"search_end", "search_end_time", "search_end_offset_minutes"}
|
|
conflicting_end = [p for p in end_conflicts if p in call_data]
|
|
if conflicting_end:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="must_finish_by_conflicts_with_end",
|
|
translation_placeholders={"params": ", ".join(sorted(conflicting_end))},
|
|
)
|
|
|
|
# search_start and search_start_time are mutually exclusive start-time specifications
|
|
if "search_start" in call_data and "search_start_time" in call_data:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="start_time_conflict",
|
|
)
|
|
if "search_end" in call_data and "search_end_time" in call_data:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="end_time_conflict",
|
|
)
|
|
|
|
# day_offset without matching time parameter is meaningless
|
|
# Schema defaults provide 0, but user explicitly setting non-zero without time is an error.
|
|
# We detect explicit usage by checking for non-default values when time is absent.
|
|
if "search_start_time" not in call_data and call_data.get("search_start_day_offset", 0) != 0:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="day_offset_requires_time",
|
|
translation_placeholders={"offset_param": "search_start_day_offset", "time_param": "search_start_time"},
|
|
)
|
|
if "search_end_time" not in call_data and call_data.get("search_end_day_offset", 0) != 0:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="day_offset_requires_time",
|
|
translation_placeholders={"offset_param": "search_end_day_offset", "time_param": "search_end_time"},
|
|
)
|
|
|
|
|
|
def apply_must_finish_by(
|
|
call_data: dict[str, Any],
|
|
home_tz: ZoneInfo,
|
|
) -> tuple[dict[str, Any], datetime | None]:
|
|
"""Convert must_finish_by deadline to search_end.
|
|
|
|
When must_finish_by is set, search_end is set to must_finish_by directly.
|
|
The interval pool uses exclusive end_time (intervals with startsAt < end_time),
|
|
so the latest found window/schedule naturally ends at search_end.
|
|
|
|
Args:
|
|
call_data: Service call data dict.
|
|
home_tz: Home timezone for datetime localization.
|
|
|
|
Returns:
|
|
Tuple of (possibly modified call_data, localized must_finish_by datetime or None).
|
|
If must_finish_by is absent, returns the original call_data unchanged.
|
|
|
|
"""
|
|
if "must_finish_by" not in call_data:
|
|
return call_data, None
|
|
|
|
must_finish_by = localize_to_home_tz(call_data["must_finish_by"], home_tz)
|
|
|
|
modified = dict(call_data)
|
|
modified["search_end"] = must_finish_by
|
|
del modified["must_finish_by"]
|
|
return modified, must_finish_by
|
|
|
|
|
|
def validate_price_level_range(
|
|
min_price_level: str | None,
|
|
max_price_level: str | None,
|
|
) -> None:
|
|
"""
|
|
Validate that min_price_level <= max_price_level in the level hierarchy.
|
|
|
|
Raises:
|
|
ServiceValidationError: If min level is higher than max level
|
|
|
|
"""
|
|
if min_price_level is None or max_price_level is None:
|
|
return
|
|
|
|
min_rank = _PRICE_LEVEL_RANK.get(min_price_level.upper(), 0)
|
|
max_rank = _PRICE_LEVEL_RANK.get(max_price_level.upper(), len(PRICE_LEVEL_ORDER) - 1)
|
|
|
|
if min_rank > max_rank:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="min_level_exceeds_max",
|
|
translation_placeholders={"min_level": min_price_level, "max_level": max_price_level},
|
|
)
|
|
|
|
|
|
def validate_power_profile_length(
|
|
power_profile: list[int] | None,
|
|
duration_intervals: int,
|
|
) -> None:
|
|
"""
|
|
Validate that power_profile length matches the number of intervals.
|
|
|
|
Raises:
|
|
ServiceValidationError: If lengths don't match
|
|
|
|
"""
|
|
if power_profile is None:
|
|
return
|
|
|
|
if len(power_profile) != duration_intervals:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="power_profile_length_mismatch",
|
|
translation_placeholders={
|
|
"profile_length": str(len(power_profile)),
|
|
"interval_count": str(duration_intervals),
|
|
"duration_minutes": str(duration_intervals * INTERVAL_MINUTES),
|
|
},
|
|
)
|
|
|
|
|
|
def get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]:
|
|
"""
|
|
Validate entry and extract coordinator and data.
|
|
|
|
If entry_id is empty, auto-selects the single config entry for this domain.
|
|
Raises an error if there are zero or multiple entries and no entry_id is given.
|
|
|
|
Args:
|
|
hass: Home Assistant instance
|
|
entry_id: Config entry ID to validate (empty string to auto-select)
|
|
|
|
Returns:
|
|
Tuple of (entry, coordinator, data)
|
|
|
|
Raises:
|
|
ServiceValidationError: If entry cannot be resolved
|
|
|
|
"""
|
|
if not entry_id:
|
|
entries = hass.config_entries.async_entries(DOMAIN)
|
|
if len(entries) == 1:
|
|
entry = entries[0]
|
|
elif len(entries) == 0:
|
|
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="no_entries_found")
|
|
else:
|
|
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="multiple_entries_no_entry_id")
|
|
else:
|
|
entry = next(
|
|
(e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id),
|
|
None,
|
|
)
|
|
if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data:
|
|
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id")
|
|
coordinator = entry.runtime_data.coordinator
|
|
data = coordinator.data or {}
|
|
return entry, coordinator, data
|
|
|
|
|
|
def has_tomorrow_data(coordinator: TibberPricesDataUpdateCoordinator) -> bool:
|
|
"""
|
|
Check if tomorrow's price data is available in coordinator.
|
|
|
|
Uses get_intervals_for_day_offsets() to automatically determine tomorrow
|
|
based on current date.
|
|
|
|
Args:
|
|
coordinator: TibberPricesDataUpdateCoordinator instance
|
|
|
|
Returns:
|
|
True if tomorrow's data exists (at least one interval), False otherwise
|
|
|
|
"""
|
|
coordinator_data = coordinator.data or {}
|
|
tomorrow_intervals = get_intervals_for_day_offsets(coordinator_data, [1])
|
|
return len(tomorrow_intervals) > 0
|
|
|
|
|
|
def resolve_home_timezone(
|
|
coordinator: Any,
|
|
home_id: str,
|
|
) -> str:
|
|
"""Extract home timezone from coordinator's cached user data."""
|
|
user_data = coordinator._cached_user_data # noqa: SLF001
|
|
if not user_data:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="user_data_not_available",
|
|
)
|
|
|
|
if "viewer" in user_data:
|
|
for home in user_data["viewer"].get("homes", []):
|
|
if home.get("id") == home_id:
|
|
tz = home.get("timeZone")
|
|
if tz:
|
|
return tz
|
|
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="timezone_not_found",
|
|
)
|
|
|
|
|
|
def localize_to_home_tz(dt_value: datetime, home_tz: ZoneInfo) -> datetime:
|
|
"""
|
|
Localize a datetime to the Tibber home timezone.
|
|
|
|
Handles the critical two-step process:
|
|
1. GUI naive datetime → localize to HA server timezone
|
|
2. Convert from HA timezone to home timezone
|
|
"""
|
|
if dt_value.tzinfo is None:
|
|
dt_value = dt_util.as_local(dt_value)
|
|
return dt_value.astimezone(home_tz)
|
|
|
|
|
|
def calculate_end_of_tomorrow(home_tz: ZoneInfo) -> datetime:
|
|
"""Calculate end of tomorrow in home timezone."""
|
|
now_home = dt_util.now().astimezone(home_tz)
|
|
tomorrow = (now_home + timedelta(days=1)).date()
|
|
# End of tomorrow = midnight at start of day after tomorrow
|
|
return now_home.replace(
|
|
year=tomorrow.year,
|
|
month=tomorrow.month,
|
|
day=tomorrow.day,
|
|
hour=0,
|
|
minute=0,
|
|
second=0,
|
|
microsecond=0,
|
|
) + timedelta(days=1)
|
|
|
|
|
|
def floor_to_quarter_hour(dt_value: datetime) -> datetime:
|
|
"""Floor a datetime to the current quarter-hour boundary."""
|
|
return dt_value.replace(minute=(dt_value.minute // INTERVAL_MINUTES) * INTERVAL_MINUTES, second=0, microsecond=0)
|
|
|
|
|
|
def _resolve_time_with_day_offset(
|
|
time_value: dt_time,
|
|
day_offset: int,
|
|
home_tz: ZoneInfo,
|
|
now: datetime,
|
|
) -> datetime:
|
|
"""Resolve a time-of-day + day offset to a full datetime in home timezone."""
|
|
now_home = now.astimezone(home_tz)
|
|
target_date = (now_home + timedelta(days=day_offset)).date()
|
|
return datetime(
|
|
year=target_date.year,
|
|
month=target_date.month,
|
|
day=target_date.day,
|
|
hour=time_value.hour,
|
|
minute=time_value.minute,
|
|
second=time_value.second,
|
|
tzinfo=home_tz,
|
|
)
|
|
|
|
|
|
def _resolve_scope(scope: str, now: datetime, _home_tz: ZoneInfo) -> tuple[datetime, datetime]:
|
|
"""
|
|
Convert a search_scope shorthand into explicit start/end datetimes.
|
|
|
|
Args:
|
|
scope: One of "today", "tomorrow", "remaining_today", "next_24h", "next_48h"
|
|
now: Current datetime in home timezone
|
|
home_tz: Home timezone for date calculations
|
|
|
|
Returns:
|
|
Tuple of (start, end) datetimes in home timezone
|
|
|
|
"""
|
|
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
tomorrow_start = today_start + timedelta(days=1)
|
|
day_after_start = today_start + timedelta(days=2)
|
|
|
|
if scope == "today":
|
|
return today_start, tomorrow_start
|
|
if scope == "tomorrow":
|
|
return tomorrow_start, day_after_start
|
|
if scope == "remaining_today":
|
|
return floor_to_quarter_hour(now), tomorrow_start
|
|
if scope == "next_24h":
|
|
return floor_to_quarter_hour(now), now + timedelta(hours=24)
|
|
if scope == "next_48h":
|
|
return floor_to_quarter_hour(now), now + timedelta(hours=48)
|
|
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="invalid_search_scope",
|
|
)
|
|
|
|
|
|
def filter_intervals_by_price_level(
|
|
intervals: list[dict[str, Any]],
|
|
min_price_level: str | None,
|
|
max_price_level: str | None,
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
Filter intervals by Tibber price level.
|
|
|
|
Keeps only intervals whose 'level' field is within the requested range.
|
|
If an interval has no 'level' field it is kept (avoids silently dropping data on API changes).
|
|
|
|
Args:
|
|
intervals: Price interval dicts with optional 'level' key
|
|
min_price_level: Lower bound level (inclusive), e.g. "CHEAP"
|
|
max_price_level: Upper bound level (inclusive), e.g. "NORMAL"
|
|
|
|
Returns:
|
|
Filtered list; same list reference if no filter is active
|
|
|
|
"""
|
|
if min_price_level is None and max_price_level is None:
|
|
return intervals
|
|
|
|
min_rank = _PRICE_LEVEL_RANK.get(min_price_level.upper(), 0) if min_price_level else 0
|
|
max_rank = (
|
|
_PRICE_LEVEL_RANK.get(max_price_level.upper(), len(PRICE_LEVEL_ORDER) - 1)
|
|
if max_price_level
|
|
else len(PRICE_LEVEL_ORDER) - 1
|
|
)
|
|
|
|
result = []
|
|
for iv in intervals:
|
|
level = iv.get("level")
|
|
if level is None:
|
|
result.append(iv)
|
|
continue
|
|
rank = _PRICE_LEVEL_RANK.get(str(level).upper())
|
|
if rank is None:
|
|
result.append(iv)
|
|
continue
|
|
if min_rank <= rank <= max_rank:
|
|
result.append(iv)
|
|
return result
|
|
|
|
|
|
def build_rating_lookup(coordinator_data: dict[str, Any]) -> dict[str, str | None]:
|
|
"""
|
|
Build a startsAt → rating_level lookup from enriched coordinator data.
|
|
|
|
The coordinator's priceInfo contains rating_level (LOW/NORMAL/HIGH) computed
|
|
from trailing 24h averages with hysteresis. Pool intervals lack this field,
|
|
so this lookup allows annotating service responses with rating_level.
|
|
|
|
Args:
|
|
coordinator_data: coordinator.data dict with enriched priceInfo
|
|
|
|
Returns:
|
|
Dict mapping startsAt ISO string to lowercase rating_level (or None)
|
|
|
|
"""
|
|
lookup: dict[str, str | None] = {}
|
|
for iv in coordinator_data.get("priceInfo", []):
|
|
starts_at = iv.get("startsAt")
|
|
rating = iv.get("rating_level")
|
|
if starts_at:
|
|
lookup[starts_at] = rating.lower() if isinstance(rating, str) else None
|
|
return lookup
|
|
|
|
|
|
def build_response_interval(
|
|
iv: dict[str, Any],
|
|
unit_factor: int,
|
|
rating_lookup: dict[str, str | None],
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Build an enriched interval dict for service responses.
|
|
|
|
Converts a raw pool interval into a companion-friendly format with
|
|
ends_at, level, and rating_level fields.
|
|
|
|
Args:
|
|
iv: Raw interval dict from pool (startsAt, total, level, ...)
|
|
unit_factor: Price unit multiplier (1 for base unit, 100 for cents, etc.)
|
|
rating_lookup: startsAt → rating_level mapping from coordinator data
|
|
|
|
Returns:
|
|
Enriched interval dict for service response
|
|
|
|
"""
|
|
starts_at = iv["startsAt"]
|
|
if isinstance(starts_at, str):
|
|
ends_at = (datetime.fromisoformat(starts_at) + timedelta(minutes=INTERVAL_MINUTES)).isoformat()
|
|
else:
|
|
ends_at = (starts_at + timedelta(minutes=INTERVAL_MINUTES)).isoformat()
|
|
|
|
return {
|
|
"starts_at": starts_at,
|
|
"ends_at": ends_at,
|
|
"price": round(iv["total"] * unit_factor, 4),
|
|
"level": (iv.get("level") or "").lower() or None,
|
|
"rating_level": rating_lookup.get(starts_at),
|
|
}
|
|
|
|
|
|
def resolve_search_range(
|
|
call_data: dict[str, Any],
|
|
now: datetime,
|
|
home_tz: ZoneInfo,
|
|
) -> tuple[datetime, datetime]:
|
|
"""
|
|
Resolve search start/end from scope shorthand, explicit datetime, time+offset, or defaults.
|
|
|
|
Priority (highest to lowest):
|
|
0. search_scope shorthand (today, tomorrow, remaining_today, next_24h, next_48h)
|
|
1. Explicit datetime (search_start / search_end)
|
|
2. Time-of-day + day offset (search_start_time + search_start_day_offset)
|
|
3. Minutes offset (search_start_offset_minutes / search_end_offset_minutes)
|
|
4. Default (now for start, end of tomorrow for end)
|
|
|
|
Calls validate_search_params() first to check for conflicting combinations.
|
|
"""
|
|
validate_search_params(call_data)
|
|
include_current = call_data.get("include_current_interval", True)
|
|
|
|
# Priority 0: search_scope shorthand
|
|
if "search_scope" in call_data:
|
|
return _resolve_scope(call_data["search_scope"], now, home_tz)
|
|
|
|
# --- Resolve start ---
|
|
if "search_start" in call_data:
|
|
search_start = localize_to_home_tz(call_data["search_start"], home_tz)
|
|
elif "search_start_time" in call_data:
|
|
day_offset = call_data.get("search_start_day_offset", 0)
|
|
search_start = _resolve_time_with_day_offset(call_data["search_start_time"], day_offset, home_tz, now)
|
|
elif "search_start_offset_minutes" in call_data:
|
|
search_start = now + timedelta(minutes=call_data["search_start_offset_minutes"])
|
|
if include_current:
|
|
search_start = floor_to_quarter_hour(search_start)
|
|
else:
|
|
search_start = floor_to_quarter_hour(now) if include_current else now
|
|
|
|
# --- Resolve end ---
|
|
if "search_end" in call_data:
|
|
search_end = localize_to_home_tz(call_data["search_end"], home_tz)
|
|
elif "search_end_time" in call_data:
|
|
day_offset = call_data.get("search_end_day_offset", 0)
|
|
search_end = _resolve_time_with_day_offset(call_data["search_end_time"], day_offset, home_tz, now)
|
|
elif "search_end_offset_minutes" in call_data:
|
|
search_end = now + timedelta(minutes=call_data["search_end_offset_minutes"])
|
|
else:
|
|
search_end = calculate_end_of_tomorrow(home_tz)
|
|
|
|
if search_end <= search_start:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="end_before_start",
|
|
translation_placeholders={
|
|
"search_start": search_start.strftime("%Y-%m-%d %H:%M %z"),
|
|
"search_end": search_end.strftime("%Y-%m-%d %H:%M %z"),
|
|
},
|
|
)
|
|
|
|
return search_start, search_end
|
|
|
|
|
|
def smooth_service_intervals(
|
|
intervals: list[dict[str, Any]],
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
Apply outlier smoothing to price intervals for service window finding.
|
|
|
|
Reuses the period system's outlier filtering with a fixed flexibility
|
|
percentage. Smoothed intervals are used for window FINDING only —
|
|
original prices should be restored for response reporting.
|
|
|
|
Args:
|
|
intervals: Price intervals to smooth.
|
|
|
|
Returns:
|
|
New list of intervals with spike prices smoothed.
|
|
Smoothed intervals have '_original_total' preserving the real price.
|
|
|
|
"""
|
|
from custom_components.tibber_prices.coordinator.period_handlers.outlier_filtering import ( # noqa: PLC0415
|
|
filter_price_outliers,
|
|
)
|
|
|
|
return filter_price_outliers(intervals, DEFAULT_SERVICE_SMOOTHING_FLEX, 0)
|
|
|
|
|
|
def restore_original_prices(intervals: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""
|
|
Restore original prices on intervals that were smoothed.
|
|
|
|
After using smoothed intervals for window finding, call this to get
|
|
intervals with true prices for response reporting.
|
|
|
|
Args:
|
|
intervals: Intervals possibly containing '_original_total' from smoothing.
|
|
|
|
Returns:
|
|
New list of intervals with original prices restored and smoothing
|
|
metadata removed.
|
|
|
|
"""
|
|
result = []
|
|
for iv in intervals:
|
|
if "_original_total" in iv:
|
|
restored = {k: v for k, v in iv.items() if k not in ("_smoothed", "_original_total")}
|
|
restored["total"] = iv["_original_total"]
|
|
result.append(restored)
|
|
else:
|
|
result.append(iv)
|
|
return result
|
|
|
|
|
|
def calculate_search_range_avg(intervals: list[dict[str, Any]]) -> float | None:
|
|
"""
|
|
Calculate average price across all intervals in the search range.
|
|
|
|
Used as reference for min_distance_from_avg validation.
|
|
|
|
Args:
|
|
intervals: All price intervals in the search range (unfiltered).
|
|
|
|
Returns:
|
|
Average price in base currency unit, or None if no intervals.
|
|
|
|
"""
|
|
if not intervals:
|
|
return None
|
|
return sum(iv["total"] for iv in intervals) / len(intervals)
|
|
|
|
|
|
def check_min_distance_from_avg(
|
|
window_mean_base: float,
|
|
range_avg: float,
|
|
min_distance_pct: float,
|
|
*,
|
|
reverse: bool,
|
|
) -> bool:
|
|
"""
|
|
Check if window mean price meets the minimum distance from range average.
|
|
|
|
For cheapest searches: window mean must be at least X% BELOW range average.
|
|
For most expensive searches: window mean must be at least X% ABOVE range average.
|
|
|
|
Args:
|
|
window_mean_base: Window mean price in BASE currency (not display unit).
|
|
range_avg: Search range average price in BASE currency.
|
|
min_distance_pct: Required distance as percentage (e.g. 5.0 = 5%).
|
|
reverse: True for most-expensive searches.
|
|
|
|
Returns:
|
|
True if the window passes the distance check.
|
|
|
|
"""
|
|
if range_avg == 0:
|
|
return True # Cannot calculate percentage difference from zero
|
|
|
|
distance_ratio = min_distance_pct / 100
|
|
if reverse:
|
|
# Most expensive: window mean must be >= avg * (1 + distance)
|
|
threshold = range_avg * (1 + distance_ratio)
|
|
return window_mean_base >= threshold
|
|
# Cheapest: window mean must be <= avg * (1 - distance)
|
|
threshold = range_avg * (1 - distance_ratio)
|
|
return window_mean_base <= threshold
|