mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-05-28 18:43:40 +00:00
Accepts battery parameters (capacity, current/target SoC, max power) and returns a cost-minimized charging schedule with per-interval power, SoC progression, and total cost — no manual duration calculation needed. Supports fixed, continuous (min_charge_power_w), and stepped (charge_power_steps_w) charging modes, deadline-aware two-pass planning (must_reach_soc + must_reach_by / must_reach_by_event), and round-trip economics (expected_discharge_price, reserve_for_discharge, max_cost_per_kwh) for arbitrage use cases. Includes min_charge_duration and max_cycles_per_day constraints. Groups deadline fields (must_reach_soc_*, must_reach_by, must_reach_by_event) into a dedicated section so a deadline use case can be configured in one place. Battery section lists capacity before the percent SoC fields that depend on it. Response exposes stable reason codes (already_at_target, energy_unreachable, energy_unreachable_by_ deadline, no_intervals_after_economic_filter, …) documented in the service description and user docs.
973 lines
39 KiB
Python
973 lines
39 KiB
Python
"""Service handler for the plan_charging service."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, time as dt_time, timedelta
|
|
import logging
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
import voluptuous as vol
|
|
|
|
from custom_components.tibber_prices.const import DOMAIN, get_display_unit_factor, get_display_unit_string
|
|
from custom_components.tibber_prices.utils.price_window import (
|
|
calculate_window_statistics,
|
|
find_cheapest_n_intervals,
|
|
group_intervals_into_segments,
|
|
)
|
|
from homeassistant.exceptions import ServiceValidationError
|
|
from homeassistant.helpers import config_validation as cv
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .charging import (
|
|
apply_segment_constraints,
|
|
build_deadline_schedule,
|
|
build_power_schedule,
|
|
build_soc_progression_from_schedule,
|
|
calculate_energy_needed,
|
|
calculate_plan_economics,
|
|
determine_power_mode,
|
|
energy_for_power,
|
|
filter_intervals_by_profitability,
|
|
get_deadline_events,
|
|
resolve_deadline,
|
|
soc_percent_to_kwh,
|
|
)
|
|
from .entity_resolver import or_entity_ref, resolve_entity_references
|
|
from .helpers import (
|
|
INTERVAL_MINUTES,
|
|
PRICE_LEVEL_ORDER,
|
|
VALID_SEARCH_SCOPES,
|
|
apply_must_finish_by,
|
|
build_rating_lookup,
|
|
build_response_interval,
|
|
calculate_search_range_avg,
|
|
check_min_distance_from_avg,
|
|
filter_intervals_by_price_level,
|
|
get_entry_and_data,
|
|
resolve_home_timezone,
|
|
resolve_search_range,
|
|
validate_price_level_range,
|
|
validate_search_params,
|
|
)
|
|
from .relaxation import (
|
|
MIN_RELAXED_DURATION_INTERVALS,
|
|
calculate_max_duration_reduction_intervals,
|
|
generate_relaxation_steps,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
PLAN_CHARGING_SERVICE_NAME = "plan_charging"
|
|
|
|
_CHARGING_ENTITY_PARAMS: dict[str, type] = {
|
|
"battery_capacity_kwh": float,
|
|
"current_soc_percent": float,
|
|
"current_soc_kwh": float,
|
|
"target_soc_percent": float,
|
|
"target_soc_kwh": float,
|
|
"must_reach_soc_percent": float,
|
|
"must_reach_soc_kwh": float,
|
|
"max_charge_power_w": int,
|
|
"min_charge_power_w": int,
|
|
"grid_import_limit_w": int,
|
|
"charging_efficiency": float,
|
|
"discharging_efficiency": float,
|
|
"expected_discharge_price": float,
|
|
"max_cost_per_kwh": float,
|
|
"min_charge_duration_minutes": int,
|
|
"max_cycles_per_day": int,
|
|
"search_start": datetime,
|
|
"search_end": datetime,
|
|
"search_start_time": dt_time,
|
|
"search_end_time": dt_time,
|
|
"search_start_day_offset": int,
|
|
"search_end_day_offset": int,
|
|
"search_start_offset_minutes": int,
|
|
"search_end_offset_minutes": int,
|
|
"min_distance_from_avg": float,
|
|
"duration_flexibility_minutes": int,
|
|
"must_finish_by": datetime,
|
|
"must_reach_by": datetime,
|
|
}
|
|
|
|
PLAN_CHARGING_SERVICE_SCHEMA = vol.Schema(
|
|
{
|
|
vol.Optional("entry_id", default=""): cv.string,
|
|
vol.Optional("battery_capacity_kwh"): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.1, max=1000.0)),
|
|
),
|
|
vol.Optional("current_soc_percent"): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.0, max=100.0)),
|
|
),
|
|
vol.Optional("current_soc_kwh"): or_entity_ref(vol.All(vol.Coerce(float), vol.Range(min=0.0))),
|
|
vol.Optional("target_soc_percent"): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.0, max=100.0)),
|
|
),
|
|
vol.Optional("target_soc_kwh"): or_entity_ref(vol.All(vol.Coerce(float), vol.Range(min=0.0))),
|
|
vol.Optional("must_reach_soc_percent"): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.0, max=100.0)),
|
|
),
|
|
vol.Optional("must_reach_soc_kwh"): or_entity_ref(vol.All(vol.Coerce(float), vol.Range(min=0.0))),
|
|
vol.Required("max_charge_power_w"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=1, max=100000)),
|
|
),
|
|
vol.Optional("min_charge_power_w"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=1, max=100000)),
|
|
),
|
|
vol.Optional("charge_power_steps_w"): vol.All(
|
|
[vol.All(vol.Coerce(int), vol.Range(min=1, max=100000))],
|
|
vol.Length(min=1, max=20),
|
|
),
|
|
vol.Optional("grid_import_limit_w"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=1, max=100000)),
|
|
),
|
|
vol.Optional("charging_efficiency", default=1.0): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.5, max=1.0)),
|
|
),
|
|
vol.Optional("search_start"): or_entity_ref(cv.datetime),
|
|
vol.Optional("search_end"): or_entity_ref(cv.datetime),
|
|
vol.Optional("search_start_time"): or_entity_ref(cv.time),
|
|
vol.Optional("search_start_day_offset", default=0): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=-7, max=2)),
|
|
),
|
|
vol.Optional("search_end_time"): or_entity_ref(cv.time),
|
|
vol.Optional("search_end_day_offset", default=0): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=-7, max=2)),
|
|
),
|
|
vol.Optional("search_start_offset_minutes"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=-10080, max=10080)),
|
|
),
|
|
vol.Optional("search_end_offset_minutes"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=-10080, max=10080)),
|
|
),
|
|
vol.Optional("search_scope"): vol.In(VALID_SEARCH_SCOPES),
|
|
vol.Optional("include_current_interval", default=True): cv.boolean,
|
|
vol.Optional("must_finish_by"): or_entity_ref(cv.datetime),
|
|
vol.Optional("must_reach_by"): or_entity_ref(cv.datetime),
|
|
vol.Optional("must_reach_by_event"): vol.In(sorted(get_deadline_events())),
|
|
vol.Optional("max_price_level"): vol.In([level.lower() for level in PRICE_LEVEL_ORDER]),
|
|
vol.Optional("min_price_level"): vol.In([level.lower() for level in PRICE_LEVEL_ORDER]),
|
|
vol.Optional("include_comparison_details", default=False): cv.boolean,
|
|
vol.Optional("use_base_unit", default=False): cv.boolean,
|
|
vol.Optional("smooth_outliers", default=True): cv.boolean,
|
|
vol.Optional("min_distance_from_avg"): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.1, max=50.0)),
|
|
),
|
|
vol.Optional("allow_relaxation", default=True): cv.boolean,
|
|
vol.Optional("duration_flexibility_minutes"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=0, max=120)),
|
|
),
|
|
vol.Optional("discharging_efficiency", default=1.0): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.5, max=1.0)),
|
|
),
|
|
vol.Optional("expected_discharge_price"): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.0, max=100000.0)),
|
|
),
|
|
vol.Optional("reserve_for_discharge", default=False): cv.boolean,
|
|
vol.Optional("max_cost_per_kwh"): or_entity_ref(
|
|
vol.All(vol.Coerce(float), vol.Range(min=0.0, max=100000.0)),
|
|
),
|
|
vol.Optional("min_charge_duration_minutes"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=15, max=240)),
|
|
),
|
|
vol.Optional("max_cycles_per_day"): or_entity_ref(
|
|
vol.All(vol.Coerce(int), vol.Range(min=1, max=20)),
|
|
),
|
|
}
|
|
)
|
|
|
|
|
|
def _interval_start(interval: dict[str, Any]) -> datetime:
|
|
starts_at = interval["startsAt"]
|
|
return datetime.fromisoformat(starts_at) if isinstance(starts_at, str) else starts_at
|
|
|
|
|
|
def _translate_error_key(error_key: str) -> ServiceValidationError:
|
|
return ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key=error_key,
|
|
)
|
|
|
|
|
|
def _resolve_soc_value(
|
|
data: dict[str, Any],
|
|
*,
|
|
percent_key: str,
|
|
kwh_key: str,
|
|
capacity_kwh: float | None,
|
|
field_name: str,
|
|
required: bool,
|
|
) -> float | None:
|
|
has_percent = percent_key in data
|
|
has_kwh = kwh_key in data
|
|
|
|
if has_percent and has_kwh:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="ambiguous_soc_input",
|
|
translation_placeholders={"field": field_name},
|
|
)
|
|
|
|
if not has_percent and not has_kwh:
|
|
if required:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key=f"missing_{field_name}",
|
|
)
|
|
return None
|
|
|
|
if has_percent:
|
|
if capacity_kwh is None:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="capacity_required_for_percent",
|
|
)
|
|
return soc_percent_to_kwh(float(data[percent_key]), capacity_kwh)
|
|
|
|
return float(data[kwh_key])
|
|
|
|
|
|
def _validate_soc_inputs(data: dict[str, Any]) -> dict[str, float | None]:
|
|
capacity_kwh = float(data["battery_capacity_kwh"]) if "battery_capacity_kwh" in data else None
|
|
current_soc_kwh = _resolve_soc_value(
|
|
data,
|
|
percent_key="current_soc_percent",
|
|
kwh_key="current_soc_kwh",
|
|
capacity_kwh=capacity_kwh,
|
|
field_name="current_soc",
|
|
required=True,
|
|
)
|
|
target_soc_kwh = _resolve_soc_value(
|
|
data,
|
|
percent_key="target_soc_percent",
|
|
kwh_key="target_soc_kwh",
|
|
capacity_kwh=capacity_kwh,
|
|
field_name="target_soc",
|
|
required=True,
|
|
)
|
|
must_reach_soc_kwh = _resolve_soc_value(
|
|
data,
|
|
percent_key="must_reach_soc_percent",
|
|
kwh_key="must_reach_soc_kwh",
|
|
capacity_kwh=capacity_kwh,
|
|
field_name="must_reach_soc",
|
|
required=False,
|
|
)
|
|
|
|
if capacity_kwh is not None and target_soc_kwh is not None and target_soc_kwh > capacity_kwh + 1e-6:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="target_exceeds_capacity",
|
|
translation_placeholders={
|
|
"target_soc_kwh": f"{target_soc_kwh:.2f}",
|
|
"capacity_kwh": f"{capacity_kwh:.2f}",
|
|
},
|
|
)
|
|
|
|
if must_reach_soc_kwh is not None and "must_reach_by" not in data and "must_reach_by_event" not in data:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="missing_deadline_for_must_reach",
|
|
)
|
|
|
|
if must_reach_soc_kwh is None and ("must_reach_by" in data or "must_reach_by_event" in data):
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="missing_must_reach_soc",
|
|
)
|
|
|
|
if current_soc_kwh is not None and target_soc_kwh is not None and must_reach_soc_kwh is not None:
|
|
if must_reach_soc_kwh < current_soc_kwh - 1e-6 or must_reach_soc_kwh > target_soc_kwh + 1e-6:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="invalid_must_reach_soc",
|
|
)
|
|
|
|
return {
|
|
"capacity_kwh": capacity_kwh,
|
|
"current_soc_kwh": current_soc_kwh,
|
|
"target_soc_kwh": target_soc_kwh,
|
|
"must_reach_soc_kwh": must_reach_soc_kwh,
|
|
}
|
|
|
|
|
|
def _build_candidate_intervals(
|
|
price_info: list[dict[str, Any]],
|
|
*,
|
|
max_price_level: str | None,
|
|
min_price_level: str | None,
|
|
smooth_outliers: bool,
|
|
charging_efficiency: float,
|
|
discharging_efficiency: float,
|
|
expected_discharge_price_base: float | None,
|
|
reserve_for_discharge: bool,
|
|
max_cost_per_kwh_base: float | None,
|
|
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
|
|
filtered_by_level = filter_intervals_by_price_level(price_info, min_price_level, max_price_level)
|
|
candidates = [dict(interval) for interval in filtered_by_level]
|
|
|
|
if smooth_outliers and candidates:
|
|
from .helpers import smooth_service_intervals # noqa: PLC0415
|
|
|
|
smoothed = smooth_service_intervals([dict(interval) for interval in candidates])
|
|
smoothed_map = {interval["startsAt"]: float(interval["total"]) for interval in smoothed}
|
|
for candidate in candidates:
|
|
candidate["_sort_total"] = smoothed_map.get(candidate["startsAt"], float(candidate["total"]))
|
|
|
|
candidates, economics_filter = filter_intervals_by_profitability(
|
|
candidates,
|
|
charging_efficiency=charging_efficiency,
|
|
discharging_efficiency=discharging_efficiency,
|
|
expected_discharge_price=expected_discharge_price_base,
|
|
reserve_for_discharge=reserve_for_discharge,
|
|
max_cost_per_kwh=max_cost_per_kwh_base,
|
|
)
|
|
return candidates, filtered_by_level, economics_filter
|
|
|
|
|
|
def _build_charging_interval(
|
|
interval: dict[str, Any],
|
|
*,
|
|
unit_factor: int,
|
|
rating_lookup: dict[str, str | None],
|
|
) -> dict[str, Any]:
|
|
response = build_response_interval(interval, unit_factor, rating_lookup)
|
|
response["power_w"] = interval.get("power_w")
|
|
response["grid_energy_kwh"] = interval.get("grid_energy_kwh")
|
|
response["energy_kwh"] = interval.get("energy_kwh")
|
|
response["stored_energy_kwh"] = interval.get("stored_energy_kwh")
|
|
response["soc_after_kwh"] = interval.get("soc_after_kwh")
|
|
if "soc_after_percent" in interval:
|
|
response["soc_after_percent"] = interval["soc_after_percent"]
|
|
return response
|
|
|
|
|
|
def _build_response_segments(
|
|
scheduled_intervals: list[dict[str, Any]],
|
|
*,
|
|
unit_factor: int,
|
|
rating_lookup: dict[str, str | None],
|
|
) -> list[dict[str, Any]]:
|
|
response_segments: list[dict[str, Any]] = []
|
|
for segment in group_intervals_into_segments(scheduled_intervals):
|
|
seg_stats = calculate_window_statistics(
|
|
segment["intervals"],
|
|
unit_factor=unit_factor,
|
|
round_decimals=4,
|
|
power_profile=[int(interval["power_w"]) for interval in segment["intervals"]],
|
|
)
|
|
segment_end = _interval_start(segment["intervals"][-1]) + timedelta(minutes=INTERVAL_MINUTES)
|
|
response_segments.append(
|
|
{
|
|
"start": segment["start"],
|
|
"end": segment_end.isoformat(),
|
|
"duration_minutes": segment["duration_minutes"],
|
|
"interval_count": segment["interval_count"],
|
|
"price_mean": seg_stats.get("price_mean"),
|
|
"intervals": [
|
|
_build_charging_interval(interval, unit_factor=unit_factor, rating_lookup=rating_lookup)
|
|
for interval in segment["intervals"]
|
|
],
|
|
}
|
|
)
|
|
return response_segments
|
|
|
|
|
|
def _build_battery_info(
|
|
*,
|
|
current_soc_kwh: float,
|
|
target_soc_kwh: float,
|
|
capacity_kwh: float | None,
|
|
requested_energy_needed_kwh: float,
|
|
charging_efficiency: float,
|
|
achieved_soc_kwh: float,
|
|
must_reach_soc_kwh: float | None,
|
|
) -> dict[str, Any]:
|
|
info: dict[str, Any] = {
|
|
"current_soc_kwh": round(current_soc_kwh, 4),
|
|
"target_soc_kwh": round(target_soc_kwh, 4),
|
|
"energy_needed_kwh": round(requested_energy_needed_kwh, 4),
|
|
"charging_efficiency": charging_efficiency,
|
|
"achieved_soc_kwh": round(achieved_soc_kwh, 4),
|
|
"target_met": achieved_soc_kwh >= target_soc_kwh - 1e-6,
|
|
}
|
|
if must_reach_soc_kwh is not None:
|
|
info["must_reach_soc_kwh"] = round(must_reach_soc_kwh, 4)
|
|
if capacity_kwh is not None and capacity_kwh > 0:
|
|
info["capacity_kwh"] = round(capacity_kwh, 4)
|
|
info["current_soc_percent"] = round(current_soc_kwh / capacity_kwh * 100.0, 2)
|
|
info["target_soc_percent"] = round(target_soc_kwh / capacity_kwh * 100.0, 2)
|
|
info["achieved_soc_percent"] = round(achieved_soc_kwh / capacity_kwh * 100.0, 2)
|
|
if must_reach_soc_kwh is not None:
|
|
info["must_reach_soc_percent"] = round(must_reach_soc_kwh / capacity_kwh * 100.0, 2)
|
|
return info
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _PlanContext:
|
|
"""Inputs that stay constant across relaxation attempts."""
|
|
|
|
price_info: list[dict[str, Any]]
|
|
current_soc_kwh: float
|
|
target_soc_kwh: float
|
|
capacity_kwh: float | None
|
|
must_reach_soc_kwh: float | None
|
|
deadline: datetime | None
|
|
charging_efficiency: float
|
|
discharging_efficiency: float
|
|
max_charge_power_w: int
|
|
min_charge_power_w: int | None
|
|
charge_power_steps_w: list[int] | None
|
|
grid_import_limit_w: int | None
|
|
min_charge_duration_minutes: int | None
|
|
max_cycles_per_day: int | None
|
|
smooth_outliers: bool
|
|
expected_discharge_price_base: float | None
|
|
reserve_for_discharge: bool
|
|
max_cost_per_kwh_base: float | None
|
|
unit_factor: int
|
|
|
|
@property
|
|
def economic_filter_active(self) -> bool:
|
|
"""Whether any economic filter param was supplied by the user."""
|
|
return (
|
|
self.expected_discharge_price_base is not None
|
|
or self.max_cost_per_kwh_base is not None
|
|
or self.reserve_for_discharge
|
|
)
|
|
|
|
|
|
def _classify_empty_candidates(
|
|
ctx: _PlanContext,
|
|
filtered_by_level: list[dict[str, Any]],
|
|
economics_filter: dict[str, Any],
|
|
*,
|
|
max_price_level: str | None,
|
|
min_price_level: str | None,
|
|
) -> str:
|
|
"""Return a stable reason code when no candidate intervals remain."""
|
|
level_filter_active = min_price_level is not None or max_price_level is not None
|
|
if not ctx.price_info:
|
|
return "no_data_in_range"
|
|
if level_filter_active and not filtered_by_level:
|
|
return "no_intervals_matching_level_filter"
|
|
if economics_filter.get("filtered_out_by_cost") or economics_filter.get("filtered_out_by_profitability"):
|
|
return "no_intervals_after_economic_filter"
|
|
return "energy_unreachable"
|
|
|
|
|
|
def _build_raw_schedule(
|
|
ctx: _PlanContext,
|
|
candidates: list[dict[str, Any]],
|
|
effective_energy_needed_grid_kwh: float,
|
|
) -> tuple[dict[str, Any] | None, str]:
|
|
"""Run the deadline-aware or single-pass scheduler and return a schedule dict."""
|
|
if ctx.deadline is not None and ctx.must_reach_soc_kwh is not None:
|
|
energy_needed_by_deadline = min(
|
|
effective_energy_needed_grid_kwh,
|
|
calculate_energy_needed(ctx.current_soc_kwh, ctx.must_reach_soc_kwh, ctx.charging_efficiency),
|
|
)
|
|
schedule = build_deadline_schedule(
|
|
candidates,
|
|
total_energy_needed_grid_kwh=effective_energy_needed_grid_kwh,
|
|
energy_needed_by_deadline_grid_kwh=energy_needed_by_deadline,
|
|
deadline=ctx.deadline,
|
|
max_charge_power_w=ctx.max_charge_power_w,
|
|
charging_efficiency=ctx.charging_efficiency,
|
|
min_charge_power_w=ctx.min_charge_power_w,
|
|
charge_power_steps_w=ctx.charge_power_steps_w,
|
|
grid_import_limit_w=ctx.grid_import_limit_w,
|
|
interval_minutes=INTERVAL_MINUTES,
|
|
)
|
|
if schedule["deadline_unallocated_grid_energy_kwh"] > 1e-6:
|
|
return None, "energy_unreachable_by_deadline"
|
|
return schedule, ""
|
|
|
|
schedule = build_power_schedule(
|
|
candidates,
|
|
effective_energy_needed_grid_kwh,
|
|
max_charge_power_w=ctx.max_charge_power_w,
|
|
charging_efficiency=ctx.charging_efficiency,
|
|
min_charge_power_w=ctx.min_charge_power_w,
|
|
charge_power_steps_w=ctx.charge_power_steps_w,
|
|
grid_import_limit_w=ctx.grid_import_limit_w,
|
|
interval_minutes=INTERVAL_MINUTES,
|
|
)
|
|
return schedule, ""
|
|
|
|
|
|
def _selection_passes_distance_check(
|
|
ctx: _PlanContext,
|
|
scheduled_intervals: list[dict[str, Any]],
|
|
min_distance_from_avg: float | None,
|
|
) -> bool:
|
|
"""Return True if the selection meets min_distance_from_avg (or the check is disabled)."""
|
|
if min_distance_from_avg is None or not scheduled_intervals:
|
|
return True
|
|
range_avg = calculate_search_range_avg(ctx.price_info)
|
|
if range_avg is None:
|
|
return True
|
|
selection_mean = sum(float(interval["total"]) for interval in scheduled_intervals) / len(scheduled_intervals)
|
|
return check_min_distance_from_avg(selection_mean, range_avg, min_distance_from_avg, reverse=False)
|
|
|
|
|
|
def _build_deadline_info(
|
|
ctx: _PlanContext,
|
|
scheduled_intervals: list[dict[str, Any]],
|
|
) -> dict[str, Any] | None:
|
|
"""Compute deadline adherence by splitting the constrained intervals at the deadline."""
|
|
if ctx.deadline is None or ctx.must_reach_soc_kwh is None:
|
|
return None
|
|
pre_stored_kwh = sum(
|
|
float(interval.get("stored_energy_kwh", 0.0))
|
|
for interval in scheduled_intervals
|
|
if _interval_start(interval) < ctx.deadline
|
|
)
|
|
achieved_by_deadline = ctx.current_soc_kwh + pre_stored_kwh
|
|
return {
|
|
"must_reach_by": ctx.deadline.isoformat(),
|
|
"must_reach_soc_kwh": round(ctx.must_reach_soc_kwh, 4),
|
|
"achieved_soc_kwh": round(achieved_by_deadline, 4),
|
|
"deadline_met": achieved_by_deadline >= ctx.must_reach_soc_kwh - 1e-6,
|
|
}
|
|
|
|
|
|
def _attempt_plan(
|
|
ctx: _PlanContext,
|
|
*,
|
|
effective_energy_needed_grid_kwh: float,
|
|
max_price_level: str | None,
|
|
min_price_level: str | None,
|
|
min_distance_from_avg: float | None,
|
|
) -> tuple[dict[str, Any] | None, str]:
|
|
"""Run one scheduling attempt with the provided (possibly relaxed) filters."""
|
|
candidates, filtered_by_level, economics_filter = _build_candidate_intervals(
|
|
ctx.price_info,
|
|
max_price_level=max_price_level,
|
|
min_price_level=min_price_level,
|
|
smooth_outliers=ctx.smooth_outliers,
|
|
charging_efficiency=ctx.charging_efficiency,
|
|
discharging_efficiency=ctx.discharging_efficiency,
|
|
expected_discharge_price_base=ctx.expected_discharge_price_base,
|
|
reserve_for_discharge=ctx.reserve_for_discharge,
|
|
max_cost_per_kwh_base=ctx.max_cost_per_kwh_base,
|
|
)
|
|
|
|
if not candidates:
|
|
reason = _classify_empty_candidates(
|
|
ctx,
|
|
filtered_by_level,
|
|
economics_filter,
|
|
max_price_level=max_price_level,
|
|
min_price_level=min_price_level,
|
|
)
|
|
return None, reason
|
|
|
|
schedule, reason = _build_raw_schedule(ctx, candidates, effective_energy_needed_grid_kwh)
|
|
if schedule is None:
|
|
return None, reason
|
|
if schedule["unallocated_grid_energy_kwh"] > 1e-6:
|
|
return None, "energy_unreachable"
|
|
|
|
schedule, warnings = apply_segment_constraints(
|
|
schedule,
|
|
candidates,
|
|
charging_efficiency=ctx.charging_efficiency,
|
|
min_charge_duration_minutes=ctx.min_charge_duration_minutes,
|
|
max_cycles_per_day=ctx.max_cycles_per_day,
|
|
interval_minutes=INTERVAL_MINUTES,
|
|
)
|
|
scheduled_intervals = build_soc_progression_from_schedule(
|
|
schedule["intervals"], ctx.current_soc_kwh, ctx.capacity_kwh
|
|
)
|
|
|
|
if not _selection_passes_distance_check(ctx, scheduled_intervals, min_distance_from_avg):
|
|
return None, "selection_above_distance_threshold"
|
|
|
|
economics = calculate_plan_economics(
|
|
scheduled_intervals,
|
|
charging_efficiency=ctx.charging_efficiency,
|
|
discharging_efficiency=ctx.discharging_efficiency,
|
|
expected_discharge_price=ctx.expected_discharge_price_base,
|
|
unit_factor=ctx.unit_factor,
|
|
max_cost_per_kwh=ctx.max_cost_per_kwh_base,
|
|
reserve_for_discharge=ctx.reserve_for_discharge,
|
|
)
|
|
|
|
return (
|
|
{
|
|
"schedule": schedule,
|
|
"scheduled_intervals": scheduled_intervals,
|
|
"achieved_soc_kwh": ctx.current_soc_kwh + schedule["total_stored_energy_kwh"],
|
|
"deadline": _build_deadline_info(ctx, scheduled_intervals),
|
|
"economics": economics,
|
|
"warnings": warnings,
|
|
"economics_filter": economics_filter if ctx.economic_filter_active else None,
|
|
},
|
|
"",
|
|
)
|
|
|
|
|
|
async def handle_plan_charging(call: ServiceCall) -> ServiceResponse:
|
|
"""Handle the plan_charging service call."""
|
|
hass: HomeAssistant = call.hass
|
|
data, resolved_refs = resolve_entity_references(hass, call.data, _CHARGING_ENTITY_PARAMS)
|
|
|
|
validated = _validate_soc_inputs(data)
|
|
capacity_kwh = validated["capacity_kwh"]
|
|
current_soc_value = validated["current_soc_kwh"]
|
|
target_soc_value = validated["target_soc_kwh"]
|
|
if current_soc_value is None or target_soc_value is None:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="missing_current_soc" if current_soc_value is None else "missing_target_soc",
|
|
)
|
|
|
|
current_soc_kwh = float(current_soc_value)
|
|
target_soc_kwh = float(target_soc_value)
|
|
must_reach_soc_value = validated["must_reach_soc_kwh"]
|
|
must_reach_soc_kwh = float(must_reach_soc_value) if must_reach_soc_value is not None else None
|
|
|
|
entry_id = data.get("entry_id", "")
|
|
max_charge_power_w = int(data["max_charge_power_w"])
|
|
min_charge_power_w = int(data["min_charge_power_w"]) if "min_charge_power_w" in data else None
|
|
charge_power_steps_w = [int(step) for step in data.get("charge_power_steps_w", [])] or None
|
|
grid_import_limit_w = int(data["grid_import_limit_w"]) if "grid_import_limit_w" in data else None
|
|
charging_efficiency = float(data.get("charging_efficiency", 1.0))
|
|
discharging_efficiency = float(data.get("discharging_efficiency", 1.0))
|
|
use_base_unit = bool(data.get("use_base_unit", False))
|
|
max_price_level = data.get("max_price_level")
|
|
min_price_level = data.get("min_price_level")
|
|
include_comparison_details = bool(data.get("include_comparison_details", False))
|
|
smooth_outliers = bool(data.get("smooth_outliers", True))
|
|
min_distance_from_avg = data.get("min_distance_from_avg")
|
|
allow_relaxation = bool(data.get("allow_relaxation", True))
|
|
duration_flexibility_minutes = data.get("duration_flexibility_minutes")
|
|
reserve_for_discharge = bool(data.get("reserve_for_discharge", False))
|
|
min_charge_duration_minutes = (
|
|
int(data["min_charge_duration_minutes"]) if "min_charge_duration_minutes" in data else None
|
|
)
|
|
max_cycles_per_day = int(data["max_cycles_per_day"]) if "max_cycles_per_day" in data else None
|
|
|
|
if current_soc_kwh >= target_soc_kwh - 1e-6:
|
|
entry, _coordinator, _coordinator_data = get_entry_and_data(hass, entry_id)
|
|
currency = entry.data.get("currency", "EUR")
|
|
price_unit = f"{currency}/kWh" if use_base_unit else get_display_unit_string(entry, currency)
|
|
response: dict[str, Any] = {
|
|
"home_id": entry.data.get("home_id", ""),
|
|
"intervals_found": False,
|
|
"reason": "already_at_target",
|
|
"battery": _build_battery_info(
|
|
current_soc_kwh=current_soc_kwh,
|
|
target_soc_kwh=target_soc_kwh,
|
|
capacity_kwh=capacity_kwh,
|
|
requested_energy_needed_kwh=0.0,
|
|
charging_efficiency=charging_efficiency,
|
|
achieved_soc_kwh=current_soc_kwh,
|
|
must_reach_soc_kwh=must_reach_soc_kwh,
|
|
),
|
|
"charging": None,
|
|
"currency": currency,
|
|
"price_unit": price_unit,
|
|
}
|
|
if resolved_refs:
|
|
response["_resolved"] = resolved_refs
|
|
return response
|
|
|
|
requested_energy_needed_grid_kwh = calculate_energy_needed(current_soc_kwh, target_soc_kwh, charging_efficiency)
|
|
|
|
entry, coordinator, coordinator_data = get_entry_and_data(hass, entry_id)
|
|
rating_lookup = build_rating_lookup(coordinator_data)
|
|
home_id = entry.data.get("home_id")
|
|
if not home_id:
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="missing_home_id",
|
|
)
|
|
|
|
validate_price_level_range(min_price_level, max_price_level)
|
|
validate_search_params(data)
|
|
home_timezone = resolve_home_timezone(coordinator, home_id)
|
|
from zoneinfo import ZoneInfo # noqa: PLC0415
|
|
|
|
home_tz: ZoneInfo = ZoneInfo(home_timezone)
|
|
effective_data, must_finish_by_dt = apply_must_finish_by(data, home_tz)
|
|
now = dt_util.now().astimezone(home_tz)
|
|
search_start, search_end = resolve_search_range(effective_data, now, home_tz)
|
|
|
|
currency = entry.data.get("currency", "EUR")
|
|
unit_factor = 1 if use_base_unit else get_display_unit_factor(entry)
|
|
price_unit = f"{currency}/kWh" if use_base_unit else get_display_unit_string(entry, currency)
|
|
expected_discharge_price_base = (
|
|
float(data["expected_discharge_price"]) / unit_factor if "expected_discharge_price" in data else None
|
|
)
|
|
max_cost_per_kwh_base = float(data["max_cost_per_kwh"]) / unit_factor if "max_cost_per_kwh" in data else None
|
|
|
|
try:
|
|
_mode, effective_max_power_w, _allowed_steps = determine_power_mode(
|
|
max_charge_power_w=max_charge_power_w,
|
|
min_charge_power_w=min_charge_power_w,
|
|
charge_power_steps_w=charge_power_steps_w,
|
|
grid_import_limit_w=grid_import_limit_w,
|
|
)
|
|
except ValueError as error:
|
|
raise _translate_error_key(str(error)) from error
|
|
|
|
max_interval_energy = energy_for_power(effective_max_power_w, INTERVAL_MINUTES)
|
|
requested_intervals = max(1, int((requested_energy_needed_grid_kwh / max_interval_energy) + 0.999999))
|
|
|
|
try:
|
|
deadline, deadline_source = resolve_deadline(
|
|
coordinator_data=coordinator_data,
|
|
now=now,
|
|
home_tz=home_tz,
|
|
must_reach_by=data.get("must_reach_by"),
|
|
must_reach_by_event=data.get("must_reach_by_event"),
|
|
)
|
|
except ValueError as error:
|
|
raise _translate_error_key(str(error)) from error
|
|
|
|
if deadline is not None and (deadline <= search_start or deadline > search_end):
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="deadline_outside_search_range",
|
|
)
|
|
|
|
api_client = coordinator.api
|
|
user_data = coordinator._cached_user_data # noqa: SLF001
|
|
pool = entry.runtime_data.interval_pool
|
|
try:
|
|
price_info, _api_called = await pool.get_intervals(
|
|
api_client=api_client,
|
|
user_data=user_data,
|
|
start_time=search_start,
|
|
end_time=search_end,
|
|
)
|
|
except Exception as error:
|
|
_LOGGER.exception("Error fetching price data for %s", PLAN_CHARGING_SERVICE_NAME)
|
|
raise ServiceValidationError(
|
|
translation_domain=DOMAIN,
|
|
translation_key="price_fetch_failed",
|
|
) from error
|
|
|
|
plan_ctx = _PlanContext(
|
|
price_info=price_info,
|
|
current_soc_kwh=current_soc_kwh,
|
|
target_soc_kwh=target_soc_kwh,
|
|
capacity_kwh=capacity_kwh,
|
|
must_reach_soc_kwh=must_reach_soc_kwh,
|
|
deadline=deadline,
|
|
charging_efficiency=charging_efficiency,
|
|
discharging_efficiency=discharging_efficiency,
|
|
max_charge_power_w=max_charge_power_w,
|
|
min_charge_power_w=min_charge_power_w,
|
|
charge_power_steps_w=charge_power_steps_w,
|
|
grid_import_limit_w=grid_import_limit_w,
|
|
min_charge_duration_minutes=min_charge_duration_minutes,
|
|
max_cycles_per_day=max_cycles_per_day,
|
|
smooth_outliers=smooth_outliers,
|
|
expected_discharge_price_base=expected_discharge_price_base,
|
|
reserve_for_discharge=reserve_for_discharge,
|
|
max_cost_per_kwh_base=max_cost_per_kwh_base,
|
|
unit_factor=unit_factor,
|
|
)
|
|
|
|
planning_result, reason = _attempt_plan(
|
|
plan_ctx,
|
|
effective_energy_needed_grid_kwh=requested_energy_needed_grid_kwh,
|
|
max_price_level=max_price_level,
|
|
min_price_level=min_price_level,
|
|
min_distance_from_avg=min_distance_from_avg,
|
|
)
|
|
|
|
relaxation_applied = False
|
|
relaxation_steps = 0
|
|
warnings: list[str] = []
|
|
|
|
if planning_result is None and allow_relaxation:
|
|
max_reduction = calculate_max_duration_reduction_intervals(requested_intervals, duration_flexibility_minutes)
|
|
steps = generate_relaxation_steps(
|
|
min_distance_from_avg=min_distance_from_avg,
|
|
max_price_level=max_price_level,
|
|
min_price_level=min_price_level,
|
|
total_intervals=requested_intervals,
|
|
min_duration_intervals=max(MIN_RELAXED_DURATION_INTERVALS, 1),
|
|
max_duration_reduction_intervals=max_reduction,
|
|
reverse=False,
|
|
)
|
|
for step in steps:
|
|
reduced_energy = max(
|
|
calculate_energy_needed(current_soc_kwh, must_reach_soc_kwh, charging_efficiency)
|
|
if must_reach_soc_kwh is not None
|
|
else 0.0,
|
|
requested_energy_needed_grid_kwh - (step.duration_reduction * max_interval_energy),
|
|
)
|
|
attempt, reason = _attempt_plan(
|
|
plan_ctx,
|
|
effective_energy_needed_grid_kwh=reduced_energy,
|
|
max_price_level=step.max_price_level,
|
|
min_price_level=step.min_price_level,
|
|
min_distance_from_avg=step.min_distance_from_avg,
|
|
)
|
|
if attempt is not None:
|
|
planning_result = attempt
|
|
relaxation_applied = True
|
|
relaxation_steps = step.step_number
|
|
warnings.append("target_reduced_by_relaxation")
|
|
break
|
|
|
|
if planning_result is None:
|
|
response: dict[str, Any] = {
|
|
"home_id": home_id,
|
|
"search_start": search_start.isoformat(),
|
|
"search_end": search_end.isoformat(),
|
|
"must_finish_by": must_finish_by_dt.isoformat() if must_finish_by_dt else None,
|
|
"intervals_found": False,
|
|
"reason": reason,
|
|
"battery": _build_battery_info(
|
|
current_soc_kwh=current_soc_kwh,
|
|
target_soc_kwh=target_soc_kwh,
|
|
capacity_kwh=capacity_kwh,
|
|
requested_energy_needed_kwh=requested_energy_needed_grid_kwh,
|
|
charging_efficiency=charging_efficiency,
|
|
achieved_soc_kwh=current_soc_kwh,
|
|
must_reach_soc_kwh=must_reach_soc_kwh,
|
|
),
|
|
"charging": None,
|
|
"deadline": {"must_reach_by": deadline.isoformat(), "source": deadline_source} if deadline else None,
|
|
"economics": None,
|
|
"currency": currency,
|
|
"price_unit": price_unit,
|
|
"relaxation_applied": relaxation_applied,
|
|
}
|
|
if relaxation_applied:
|
|
response["relaxation_steps"] = relaxation_steps
|
|
if resolved_refs:
|
|
response["_resolved"] = resolved_refs
|
|
return response
|
|
|
|
scheduled_intervals = planning_result["scheduled_intervals"]
|
|
schedule_data = planning_result["schedule"]
|
|
warnings.extend(planning_result.get("warnings", []))
|
|
achieved_soc_kwh = float(planning_result["achieved_soc_kwh"])
|
|
|
|
response_intervals = [
|
|
_build_charging_interval(interval, unit_factor=unit_factor, rating_lookup=rating_lookup)
|
|
for interval in scheduled_intervals
|
|
]
|
|
response_segments = _build_response_segments(
|
|
scheduled_intervals,
|
|
unit_factor=unit_factor,
|
|
rating_lookup=rating_lookup,
|
|
)
|
|
power_profile = [int(interval["power_w"]) for interval in scheduled_intervals]
|
|
stats = calculate_window_statistics(
|
|
scheduled_intervals,
|
|
unit_factor=unit_factor,
|
|
round_decimals=4,
|
|
power_profile=power_profile,
|
|
)
|
|
total_cost_base = sum(
|
|
float(interval["total"]) * float(interval["grid_energy_kwh"]) for interval in scheduled_intervals
|
|
)
|
|
avg_price_per_kwh_base = (
|
|
total_cost_base / schedule_data["total_grid_energy_kwh"] if schedule_data["total_grid_energy_kwh"] > 0 else 0.0
|
|
)
|
|
|
|
comparison_result = find_cheapest_n_intervals(price_info, len(scheduled_intervals), 1, reverse=True)
|
|
price_comparison: dict[str, Any] = {}
|
|
if comparison_result is not None:
|
|
comparison_stats = calculate_window_statistics(
|
|
comparison_result["intervals"],
|
|
unit_factor=unit_factor,
|
|
round_decimals=4,
|
|
)
|
|
own_mean = stats.get("price_mean")
|
|
comparison_mean = comparison_stats.get("price_mean")
|
|
if own_mean is not None and comparison_mean is not None:
|
|
price_comparison = {
|
|
"comparison_price_mean": comparison_mean,
|
|
"price_difference": abs(round(float(comparison_mean) - float(own_mean), 4)),
|
|
}
|
|
if include_comparison_details:
|
|
price_comparison["comparison_price_min"] = comparison_stats.get("price_min")
|
|
price_comparison["comparison_price_max"] = comparison_stats.get("price_max")
|
|
|
|
seconds_until_start = None
|
|
seconds_until_end = None
|
|
if response_segments:
|
|
first_dt = datetime.fromisoformat(response_segments[0]["start"])
|
|
last_dt = datetime.fromisoformat(response_segments[-1]["end"])
|
|
seconds_until_start = max(0, int((first_dt - now).total_seconds()))
|
|
seconds_until_end = max(0, int((last_dt - now).total_seconds()))
|
|
|
|
battery_info = _build_battery_info(
|
|
current_soc_kwh=current_soc_kwh,
|
|
target_soc_kwh=target_soc_kwh,
|
|
capacity_kwh=capacity_kwh,
|
|
requested_energy_needed_kwh=requested_energy_needed_grid_kwh,
|
|
charging_efficiency=charging_efficiency,
|
|
achieved_soc_kwh=achieved_soc_kwh,
|
|
must_reach_soc_kwh=must_reach_soc_kwh,
|
|
)
|
|
if relaxation_applied and achieved_soc_kwh < target_soc_kwh - 1e-6:
|
|
battery_info["target_met"] = False
|
|
|
|
deadline_info = planning_result.get("deadline")
|
|
if deadline_info is not None:
|
|
deadline_info = dict(deadline_info)
|
|
deadline_info["source"] = deadline_source
|
|
if capacity_kwh is not None and capacity_kwh > 0:
|
|
deadline_info["achieved_soc_percent"] = round(deadline_info["achieved_soc_kwh"] / capacity_kwh * 100.0, 2)
|
|
deadline_info["must_reach_soc_percent"] = round(
|
|
deadline_info["must_reach_soc_kwh"] / capacity_kwh * 100.0, 2
|
|
)
|
|
|
|
response: dict[str, Any] = {
|
|
"home_id": home_id,
|
|
"search_start": search_start.isoformat(),
|
|
"search_end": search_end.isoformat(),
|
|
"must_finish_by": must_finish_by_dt.isoformat() if must_finish_by_dt else None,
|
|
"intervals_found": True,
|
|
"currency": currency,
|
|
"price_unit": price_unit,
|
|
"battery": battery_info,
|
|
"charging": {
|
|
"mode": schedule_data["mode"],
|
|
"charge_power_w": max_charge_power_w,
|
|
"min_charge_power_w": min_charge_power_w,
|
|
"charge_power_steps_w": charge_power_steps_w,
|
|
"grid_import_limit_w": grid_import_limit_w,
|
|
"effective_max_charge_power_w": schedule_data["effective_max_power_w"],
|
|
"total_duration_minutes": len(scheduled_intervals) * INTERVAL_MINUTES,
|
|
"total_energy_kwh": round(schedule_data["total_grid_energy_kwh"], 6),
|
|
"stored_energy_kwh": round(schedule_data["total_stored_energy_kwh"], 6),
|
|
"total_cost": round(total_cost_base * unit_factor, 4),
|
|
"avg_price_per_kwh": round(avg_price_per_kwh_base * unit_factor, 4),
|
|
"schedule": {
|
|
"segment_count": len(response_segments),
|
|
"segments": response_segments,
|
|
"intervals": response_intervals,
|
|
"seconds_until_start": seconds_until_start,
|
|
"seconds_until_end": seconds_until_end,
|
|
**stats,
|
|
},
|
|
},
|
|
"deadline": deadline_info,
|
|
"economics": planning_result.get("economics"),
|
|
"economics_filter": planning_result.get("economics_filter"),
|
|
"price_comparison": price_comparison or None,
|
|
"relaxation_applied": relaxation_applied,
|
|
"warnings": warnings or None,
|
|
}
|
|
if relaxation_applied:
|
|
response["relaxation_steps"] = relaxation_steps
|
|
if resolved_refs:
|
|
response["_resolved"] = resolved_refs
|
|
return response
|