mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-05-28 18:43:40 +00:00
Add `include_current_interval` parameter to `find_cheapest_block` and `find_cheapest_schedule` services, controlling whether the currently active price interval can be the start of the selected window. Add power-profile weighting to `find_cheapest_contiguous_window`: accepts an optional `power_profile` list that weights each interval's price by relative power draw (e.g. heat-up phase heavier than steady state). Without a profile the behaviour is unchanged (uniform weighting). Extend search-range tests and add price-window unit tests covering weighted and unweighted scenarios, edge cases, and sequential scheduling interactions. Update scheduling-actions documentation with parameter and profile examples. Impact: Users can now model appliances with non-uniform power draw (e.g. heat pumps, washing machines) to find truly cheapest windows based on actual energy cost rather than average price.
495 lines
19 KiB
Python
495 lines
19 KiB
Python
"""
|
|
Service handler for find_cheapest_block and find_most_expensive_block services.
|
|
|
|
Finds the cheapest (or most expensive) contiguous window of a given duration
|
|
within a search range. Designed for appliance scheduling (dishwasher, washing
|
|
machine, dryer).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, time as dt_time, timedelta
|
|
import logging
|
|
import math
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
import voluptuous as vol
|
|
|
|
from custom_components.tibber_prices.const import DOMAIN, get_display_unit_factor, get_display_unit_string
|
|
from custom_components.tibber_prices.utils.price_window import (
|
|
calculate_window_statistics,
|
|
find_cheapest_contiguous_window,
|
|
)
|
|
from homeassistant.exceptions import ServiceValidationError
|
|
from homeassistant.helpers import config_validation as cv
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .entity_resolver import or_entity_ref, resolve_entity_references
|
|
from .helpers import (
|
|
INTERVAL_MINUTES,
|
|
PRICE_LEVEL_ORDER,
|
|
VALID_SEARCH_SCOPES,
|
|
apply_must_finish_by,
|
|
build_rating_lookup,
|
|
build_response_interval,
|
|
calculate_search_range_avg,
|
|
check_min_distance_from_avg,
|
|
filter_intervals_by_price_level,
|
|
get_entry_and_data,
|
|
resolve_home_timezone,
|
|
resolve_search_range,
|
|
restore_original_prices,
|
|
smooth_service_intervals,
|
|
validate_power_profile_length,
|
|
validate_price_level_range,
|
|
validate_search_params,
|
|
)
|
|
from .relaxation import (
|
|
MIN_RELAXED_DURATION_INTERVALS,
|
|
calculate_max_duration_reduction_intervals,
|
|
generate_relaxation_steps,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
FIND_CHEAPEST_BLOCK_SERVICE_NAME = "find_cheapest_block"
|
|
|
|
# Parameter types for entity reference resolution (param_name → expected Python type)
|
|
COMMON_BLOCK_ENTITY_PARAMS: dict[str, type] = {
|
|
"duration": timedelta,
|
|
"search_start": datetime,
|
|
"search_end": datetime,
|
|
"search_start_time": dt_time,
|
|
"search_end_time": dt_time,
|
|
"search_start_day_offset": int,
|
|
"search_end_day_offset": int,
|
|
"search_start_offset_minutes": int,
|
|
"search_end_offset_minutes": int,
|
|
"min_distance_from_avg": float,
|
|
"duration_flexibility_minutes": int,
|
|
"must_finish_by": datetime,
|
|
}
|
|
|
|
_COMMON_BLOCK_SCHEMA = {
|
|
vol.Optional("entry_id", default=""): cv.string,
|
|
vol.Required("duration"): or_entity_ref(
|
|
vol.All(cv.positive_time_period, vol.Range(min=timedelta(minutes=1), max=timedelta(hours=12))),
|
|
),
|
|
vol.Optional("search_start"): or_entity_ref(cv.datetime),
|
|
vol.Optional("search_end"): or_entity_ref(cv.datetime),
|
|
vol.Optional("search_start_time"): or_entity_ref(cv.time),
|
|
vol.Optional("search_start_day_offset", default=0): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=-7, max=2)),
|
|
),
|
|
vol.Optional("search_end_time"): or_entity_ref(cv.time),
|
|
vol.Optional("search_end_day_offset", default=0): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=-7, max=2)),
|
|
),
|
|
vol.Optional("search_start_offset_minutes"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=-10080, max=10080)),
|
|
),
|
|
vol.Optional("search_end_offset_minutes"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=-10080, max=10080)),
|
|
),
|
|
vol.Optional("search_scope"): vol.In(VALID_SEARCH_SCOPES),
|
|
vol.Optional("max_price_level"): vol.In([lvl.lower() for lvl in PRICE_LEVEL_ORDER]),
|
|
vol.Optional("min_price_level"): vol.In([lvl.lower() for lvl in PRICE_LEVEL_ORDER]),
|
|
vol.Optional("include_comparison_details", default=False): cv.boolean,
|
|
vol.Optional("power_profile"): vol.All(
|
|
[vol.All(vol.Coerce(int), vol.Range(min=1, max=100000))],
|
|
vol.Length(min=1, max=48),
|
|
),
|
|
vol.Optional("include_current_interval", default=True): cv.boolean,
|
|
vol.Optional("use_base_unit", default=False): cv.boolean,
|
|
vol.Optional("smooth_outliers", default=True): cv.boolean,
|
|
vol.Optional("min_distance_from_avg"): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.1, max=50.0)),
|
|
),
|
|
vol.Optional("allow_relaxation", default=True): cv.boolean,
|
|
vol.Optional("duration_flexibility_minutes"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=0, max=120)),
|
|
),
|
|
vol.Optional("must_finish_by"): or_entity_ref(cv.datetime),
|
|
}
|
|
|
|
FIND_CHEAPEST_BLOCK_SERVICE_SCHEMA = vol.Schema(_COMMON_BLOCK_SCHEMA)
|
|
|
|
|
|
def _compute_price_comparison(
|
|
comparison_result: dict | None,
|
|
unit_factor: int,
|
|
stats: dict,
|
|
*,
|
|
reverse: bool,
|
|
include_details: bool = False,
|
|
) -> dict[str, float | str | None] | None:
|
|
"""Compute price comparison between the selected and opposite-direction window."""
|
|
if comparison_result is None:
|
|
return None
|
|
|
|
comparison_stats = calculate_window_statistics(
|
|
comparison_result["intervals"], unit_factor=unit_factor, round_decimals=4
|
|
)
|
|
if stats.get("price_mean") is None or comparison_stats.get("price_mean") is None:
|
|
return None
|
|
|
|
diff = round(comparison_stats["price_mean"] - stats["price_mean"], 4)
|
|
if reverse:
|
|
diff = -diff
|
|
|
|
result: dict[str, float | str | None] = {
|
|
"comparison_price_mean": comparison_stats["price_mean"],
|
|
"price_difference": abs(diff),
|
|
"comparison_window_start": (
|
|
comparison_result["intervals"][0]["startsAt"]
|
|
if isinstance(comparison_result["intervals"][0]["startsAt"], str)
|
|
else comparison_result["intervals"][0]["startsAt"].isoformat()
|
|
),
|
|
}
|
|
|
|
# Optional enrichment (P6)
|
|
if include_details:
|
|
result["comparison_price_min"] = comparison_stats.get("price_min")
|
|
result["comparison_price_max"] = comparison_stats.get("price_max")
|
|
last_start = comparison_result["intervals"][-1]["startsAt"]
|
|
if not isinstance(last_start, str):
|
|
last_start = last_start.isoformat()
|
|
result["comparison_window_end"] = (
|
|
datetime.fromisoformat(last_start) + timedelta(minutes=INTERVAL_MINUTES)
|
|
).isoformat()
|
|
|
|
return result
|
|
|
|
|
|
def _determine_no_window_reason(
|
|
price_info: list[dict],
|
|
filtered_price_info: list[dict],
|
|
duration_intervals: int,
|
|
*,
|
|
level_filter_active: bool,
|
|
) -> str:
|
|
"""Classify why no block window could be found."""
|
|
if not price_info:
|
|
return "no_data_in_range"
|
|
if level_filter_active and not filtered_price_info:
|
|
return "no_intervals_matching_level_filter"
|
|
if len(filtered_price_info) < duration_intervals:
|
|
return "insufficient_intervals_after_filter"
|
|
return "insufficient_contiguous_window"
|
|
|
|
|
|
def _attempt_find_block(
|
|
price_info: list[dict],
|
|
*,
|
|
max_price_level: str | None,
|
|
min_price_level: str | None,
|
|
duration_intervals: int,
|
|
smooth_outliers: bool,
|
|
min_distance_from_avg: float | None,
|
|
power_profile: list[int] | None,
|
|
reverse: bool,
|
|
) -> tuple[dict | None, str]:
|
|
"""Attempt to find a block with specific filter parameters.
|
|
|
|
Returns:
|
|
(result_dict, "") on success or (None, reason_code) on failure.
|
|
|
|
"""
|
|
level_filter_active = min_price_level is not None or max_price_level is not None
|
|
filtered = filter_intervals_by_price_level(price_info, min_price_level, max_price_level)
|
|
|
|
if smooth_outliers and filtered:
|
|
search_data = smooth_service_intervals(filtered)
|
|
else:
|
|
search_data = filtered
|
|
|
|
result = find_cheapest_contiguous_window(
|
|
search_data, duration_intervals, reverse=reverse, power_profile=power_profile
|
|
)
|
|
|
|
if result is None:
|
|
return None, _determine_no_window_reason(
|
|
price_info, filtered, duration_intervals, level_filter_active=level_filter_active
|
|
)
|
|
|
|
# Restore original prices (smoothing only affects window selection)
|
|
if smooth_outliers:
|
|
result["intervals"] = restore_original_prices(result["intervals"])
|
|
|
|
# Check distance constraint
|
|
if min_distance_from_avg is not None:
|
|
range_avg = calculate_search_range_avg(price_info)
|
|
window_mean = sum(iv["total"] for iv in result["intervals"]) / len(result["intervals"])
|
|
if range_avg is not None and not check_min_distance_from_avg(
|
|
window_mean, range_avg, min_distance_from_avg, reverse=reverse
|
|
):
|
|
return None, "window_above_distance_threshold" if not reverse else "window_below_distance_threshold"
|
|
|
|
return result, ""
|
|
|
|
|
|
async def _handle_find_block(
|
|
call: ServiceCall,
|
|
*,
|
|
reverse: bool = False,
|
|
) -> ServiceResponse:
|
|
"""
|
|
Core handler for finding price blocks (cheapest or most expensive).
|
|
|
|
Finds the cheapest/most expensive contiguous window of the requested
|
|
duration within the search range using a sliding window algorithm.
|
|
"""
|
|
service_label = "find_most_expensive_block" if reverse else "find_cheapest_block"
|
|
hass: HomeAssistant = call.hass
|
|
|
|
# Resolve entity references (e.g., "input_number.wash_duration" → 90 minutes)
|
|
data, resolved_refs = resolve_entity_references(hass, call.data, COMMON_BLOCK_ENTITY_PARAMS)
|
|
|
|
entry_id: str = data.get("entry_id", "")
|
|
duration_td: timedelta = data["duration"]
|
|
use_base_unit: bool = data.get("use_base_unit", False)
|
|
max_price_level: str | None = data.get("max_price_level")
|
|
min_price_level: str | None = data.get("min_price_level")
|
|
include_comparison_details: bool = data.get("include_comparison_details", False)
|
|
power_profile: list[int] | None = data.get("power_profile")
|
|
smooth_outliers: bool = data.get("smooth_outliers", True)
|
|
min_distance_from_avg: float | None = data.get("min_distance_from_avg")
|
|
allow_relaxation: bool = data.get("allow_relaxation", True)
|
|
duration_flexibility_minutes: int | None = data.get("duration_flexibility_minutes")
|
|
|
|
duration_minutes_requested = int(duration_td.total_seconds() / 60)
|
|
# Round up to nearest quarter-hour interval
|
|
duration_minutes = math.ceil(duration_minutes_requested / INTERVAL_MINUTES) * INTERVAL_MINUTES
|
|
|
|
entry, coordinator, data = get_entry_and_data(hass, entry_id)
|
|
rating_lookup = build_rating_lookup(data)
|
|
|
|
home_id = entry.data.get("home_id")
|
|
if not home_id:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="missing_home_id",
|
|
)
|
|
|
|
# Resolve timezone
|
|
home_timezone = resolve_home_timezone(coordinator, home_id)
|
|
home_tz: ZoneInfo
|
|
from zoneinfo import ZoneInfo # noqa: PLC0415
|
|
|
|
home_tz = ZoneInfo(home_timezone)
|
|
|
|
# Handle must_finish_by: convert deadline to search_end
|
|
validate_search_params(data)
|
|
effective_data, must_finish_by_dt = apply_must_finish_by(data, home_tz)
|
|
|
|
# Resolve search range (priority: explicit datetime > time+offset > minutes offset > default)
|
|
now = dt_util.now().astimezone(home_tz)
|
|
search_start, search_end = resolve_search_range(effective_data, now, home_tz)
|
|
|
|
duration_intervals = duration_minutes // INTERVAL_MINUTES
|
|
|
|
# Validate parameter combinations
|
|
validate_price_level_range(min_price_level, max_price_level)
|
|
validate_power_profile_length(power_profile, duration_intervals)
|
|
|
|
_LOGGER.info(
|
|
"%s called: duration=%dmin, range=%s to %s",
|
|
service_label,
|
|
duration_minutes,
|
|
search_start,
|
|
search_end,
|
|
)
|
|
|
|
# Fetch intervals via pool
|
|
api_client = coordinator.api
|
|
user_data = coordinator._cached_user_data # noqa: SLF001
|
|
pool = entry.runtime_data.interval_pool
|
|
|
|
try:
|
|
price_info, _api_called = await pool.get_intervals(
|
|
api_client=api_client,
|
|
user_data=user_data,
|
|
start_time=search_start,
|
|
end_time=search_end,
|
|
)
|
|
except Exception as error:
|
|
_LOGGER.exception("Error fetching price data for %s", service_label)
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="price_fetch_failed",
|
|
) from error
|
|
|
|
# Determine currency and unit
|
|
currency = entry.data.get("currency", "EUR")
|
|
unit_factor = 1 if use_base_unit else get_display_unit_factor(entry)
|
|
price_unit = f"{currency}/kWh" if use_base_unit else get_display_unit_string(entry, currency)
|
|
|
|
# --- Attempt with original parameters ---
|
|
effective_duration = duration_intervals
|
|
result, reason = _attempt_find_block(
|
|
price_info,
|
|
max_price_level=max_price_level,
|
|
min_price_level=min_price_level,
|
|
duration_intervals=effective_duration,
|
|
smooth_outliers=smooth_outliers,
|
|
min_distance_from_avg=min_distance_from_avg,
|
|
power_profile=power_profile,
|
|
reverse=reverse,
|
|
)
|
|
|
|
relaxation_applied = False
|
|
relaxation_steps = 0
|
|
|
|
# --- Relaxation loop ---
|
|
if result is None and allow_relaxation:
|
|
max_reduction = calculate_max_duration_reduction_intervals(duration_intervals, duration_flexibility_minutes)
|
|
steps = generate_relaxation_steps(
|
|
min_distance_from_avg=min_distance_from_avg,
|
|
max_price_level=max_price_level,
|
|
min_price_level=min_price_level,
|
|
total_intervals=duration_intervals,
|
|
min_duration_intervals=MIN_RELAXED_DURATION_INTERVALS,
|
|
max_duration_reduction_intervals=max_reduction,
|
|
reverse=reverse,
|
|
)
|
|
for step in steps:
|
|
effective_duration = duration_intervals - step.duration_reduction
|
|
result, reason = _attempt_find_block(
|
|
price_info,
|
|
max_price_level=step.max_price_level,
|
|
min_price_level=step.min_price_level,
|
|
duration_intervals=effective_duration,
|
|
smooth_outliers=smooth_outliers,
|
|
min_distance_from_avg=step.min_distance_from_avg,
|
|
power_profile=power_profile,
|
|
reverse=reverse,
|
|
)
|
|
if result is not None:
|
|
relaxation_applied = True
|
|
relaxation_steps = step.step_number
|
|
effective_duration_minutes = effective_duration * INTERVAL_MINUTES
|
|
_LOGGER.info(
|
|
"%s: relaxation succeeded at step %d (phase=%s, duration=%dmin)",
|
|
service_label,
|
|
step.step_number,
|
|
step.phase,
|
|
effective_duration_minutes,
|
|
)
|
|
break
|
|
else:
|
|
reason = "relaxation_exhausted"
|
|
|
|
if result is None:
|
|
_LOGGER.info(
|
|
"%s: no window found (reason=%s, need %d intervals, have %d in range)",
|
|
service_label,
|
|
reason,
|
|
effective_duration,
|
|
len(price_info),
|
|
)
|
|
response: dict[str, Any] = {
|
|
"home_id": home_id,
|
|
"search_start": search_start.isoformat(),
|
|
"search_end": search_end.isoformat(),
|
|
"must_finish_by": must_finish_by_dt.isoformat() if must_finish_by_dt else None,
|
|
"duration_minutes_requested": duration_minutes_requested,
|
|
"duration_minutes": effective_duration * INTERVAL_MINUTES,
|
|
"currency": currency,
|
|
"price_unit": price_unit,
|
|
"window_found": False,
|
|
"reason": reason,
|
|
"relaxation_applied": relaxation_applied,
|
|
"window": None,
|
|
}
|
|
if relaxation_applied:
|
|
response["relaxation_steps"] = relaxation_steps
|
|
if resolved_refs:
|
|
response["_resolved"] = resolved_refs
|
|
return response
|
|
|
|
# Effective duration may differ from original if relaxation reduced it
|
|
effective_duration_minutes = effective_duration * INTERVAL_MINUTES
|
|
|
|
# Find the opposite-direction window for price comparison (from full unfiltered list)
|
|
comparison_result = find_cheapest_contiguous_window(
|
|
price_info, effective_duration, reverse=not reverse, power_profile=power_profile
|
|
)
|
|
|
|
# Calculate statistics and build response
|
|
stats = calculate_window_statistics(
|
|
result["intervals"], unit_factor=unit_factor, round_decimals=4, power_profile=power_profile
|
|
)
|
|
|
|
# Calculate price comparison (difference to opposite-direction window)
|
|
price_comparison = _compute_price_comparison(
|
|
comparison_result, unit_factor, stats, reverse=reverse, include_details=include_comparison_details
|
|
)
|
|
|
|
# Build interval list with converted prices
|
|
response_intervals = [build_response_interval(iv, unit_factor, rating_lookup) for iv in result["intervals"]]
|
|
|
|
# Calculate end time (last interval start + 15 min)
|
|
last_start = result["intervals"][-1]["startsAt"]
|
|
if isinstance(last_start, str):
|
|
end_time = datetime.fromisoformat(last_start) + timedelta(minutes=INTERVAL_MINUTES)
|
|
else:
|
|
end_time = last_start + timedelta(minutes=INTERVAL_MINUTES)
|
|
|
|
# Calculate seconds until window start for scheduling convenience
|
|
window_start_str = (
|
|
result["intervals"][0]["startsAt"]
|
|
if isinstance(result["intervals"][0]["startsAt"], str)
|
|
else result["intervals"][0]["startsAt"].isoformat()
|
|
)
|
|
window_start_dt = datetime.fromisoformat(window_start_str)
|
|
seconds_until_start = max(0, int((window_start_dt - now).total_seconds()))
|
|
end_time_dt = end_time if isinstance(end_time, datetime) else datetime.fromisoformat(end_time)
|
|
seconds_until_end = max(0, int((end_time_dt - now).total_seconds()))
|
|
|
|
response = {
|
|
"home_id": home_id,
|
|
"search_start": search_start.isoformat(),
|
|
"search_end": search_end.isoformat(),
|
|
"must_finish_by": must_finish_by_dt.isoformat() if must_finish_by_dt else None,
|
|
"duration_minutes_requested": duration_minutes_requested,
|
|
"duration_minutes": effective_duration_minutes,
|
|
"currency": currency,
|
|
"price_unit": price_unit,
|
|
"window_found": True,
|
|
"relaxation_applied": relaxation_applied,
|
|
"window": {
|
|
"start": window_start_str,
|
|
"end": end_time.isoformat() if hasattr(end_time, "isoformat") else end_time,
|
|
"seconds_until_start": seconds_until_start,
|
|
"seconds_until_end": seconds_until_end,
|
|
"duration_minutes": effective_duration_minutes,
|
|
"interval_count": len(result["intervals"]),
|
|
**stats,
|
|
"intervals": response_intervals,
|
|
},
|
|
"price_comparison": price_comparison or None,
|
|
}
|
|
if relaxation_applied:
|
|
response["relaxation_steps"] = relaxation_steps
|
|
if resolved_refs:
|
|
response["_resolved"] = resolved_refs
|
|
|
|
_LOGGER.info(
|
|
"%s: found window at %s, mean=%.4f %s",
|
|
service_label,
|
|
response["window"]["start"],
|
|
stats.get("price_mean", 0) or 0,
|
|
price_unit,
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
async def handle_find_cheapest_block(call: ServiceCall) -> ServiceResponse:
|
|
"""Handle find_cheapest_block service call."""
|
|
return await _handle_find_block(call, reverse=False)
|