hass.tibber_prices/custom_components/tibber_prices/services/common.py
Julian Pawlowski 95950f48c1 feat(services): add find_best_start and plan_charging services
Add two new service actions for intelligent device scheduling:

- find_best_start: Find optimal start time for run-once appliances
  - Considers price, optional energy estimates, optional PV power
  - Supports flexible time windows (HH:MM, ISO datetime, with/without timezone)
  - Prefers future candidates over past ones
  - Includes current interval by default (configurable)
  - Returns recommended start time with cost analysis

- plan_charging: Create optimized charging schedule for energy storage
  - Supports EV, home battery, balcony battery use cases
  - Energy target or duration-based planning
  - Split or continuous charging modes
  - Efficiency factor support
  - Includes current interval by default (configurable)
  - Returns detailed slot-by-slot charging plan

Common improvements:
- Flexible datetime parsing (ISO 8601, with/without timezone, microseconds)
- Time selector in GUI (better UX than text field)
- Currency display based on config entry settings
- Comprehensive error handling and validation
- Detailed response envelopes with warnings/errors

Impact: Users can automate appliance scheduling based on electricity
prices without external automation rules.
2026-01-20 11:47:26 +00:00

444 lines
14 KiB
Python

"""
Common utilities for planning services (find_best_start, plan_charging).
This module provides shared functionality for:
- Response envelope building
- Window parsing (datetime vs HH:MM)
- Time rounding and normalization
- Currency formatting
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
from custom_components.tibber_prices.const import get_currency_info
from homeassistant.util import dt as dt_util
if TYPE_CHECKING:
from homeassistant.core import HomeAssistant
# API Version for response envelope
SERVICE_API_VERSION = "0.1"
DEFAULT_HORIZON_HOURS = 36 # Default horizon for planning services (hours)
RESOLUTION_MINUTES = 15 # Resolution in minutes (quarter-hourly)
RESOLUTION_HALF = 8 # Half the resolution for nearest rounding threshold (15/2 rounded up)
@dataclass
class ServiceResponse:
"""Response envelope for planning services."""
ok: bool = True
service: str = ""
version: str = SERVICE_API_VERSION
generated_at: str = ""
entry_id: str = ""
resolution_minutes: int = RESOLUTION_MINUTES
currency: str = "EUR"
currency_subunit: str = "ct"
window_start: str = ""
window_end: str = ""
warnings: list[str] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
result: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for service response."""
return {
"ok": self.ok,
"service": self.service,
"version": self.version,
"generated_at": self.generated_at,
"entry_id": self.entry_id,
"resolution_minutes": self.resolution_minutes,
"currency": self.currency,
"currency_subunit": self.currency_subunit,
"window": {
"start": self.window_start,
"end": self.window_end,
},
"warnings": self.warnings,
"errors": self.errors,
"result": self.result,
}
@dataclass
class ParsedWindow:
"""Parsed and validated time window."""
start: datetime
end: datetime
warnings: list[str] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
def create_response_envelope(
service_name: str,
entry_id: str,
currency: str,
window_start: datetime,
window_end: datetime,
) -> ServiceResponse:
"""
Create a response envelope with common fields.
Args:
service_name: Full service name (e.g., "tibber_prices.find_best_start")
entry_id: Config entry ID
currency: Currency code (e.g., "EUR")
window_start: Start of time window
window_end: End of time window
Returns:
ServiceResponse with common fields populated
"""
_, subunit_symbol, _ = get_currency_info(currency)
return ServiceResponse(
service=service_name,
generated_at=dt_util.now().isoformat(),
entry_id=entry_id,
currency=currency,
currency_subunit=subunit_symbol,
window_start=window_start.isoformat(),
window_end=window_end.isoformat(),
)
def is_hhmm_format(time_str: str) -> bool:
"""
Check if string is in HH:MM or HH:MM:SS format (time-only, no date).
The time selector in HA services.yaml returns HH:MM:SS format (e.g., "14:00:00").
Args:
time_str: Time string to check
Returns:
True if HH:MM or HH:MM:SS format (with optional microseconds), False otherwise
"""
if not time_str:
return False
# Match HH:MM or HH:MM:SS format with optional microseconds
# Examples: "14:00", "14:00:00", "14:00:00.123456"
pattern = r"^([01]?[0-9]|2[0-3]):([0-5][0-9])(?::([0-5][0-9])(?:\.(\d+))?)?$"
return bool(re.match(pattern, time_str.strip()))
def parse_datetime_string(time_str: str) -> datetime | None:
"""
Parse a datetime string flexibly, accepting many common formats.
Supports:
- ISO 8601 with timezone: "2025-12-28T14:00:00+01:00"
- ISO 8601 with Z: "2025-12-28T14:00:00Z"
- ISO 8601 without timezone: "2025-12-28T14:00:00"
- With microseconds: "2025-12-28T14:00:00.123456+01:00"
- Date with space separator: "2025-12-28 14:00:00"
- Without seconds: "2025-12-28T14:00" or "2025-12-28 14:00"
Args:
time_str: Datetime string in various formats
Returns:
Parsed datetime (timezone-aware in HA timezone) or None if unparseable
"""
if not time_str:
return None
time_str = time_str.strip()
# Replace Z with +00:00 for UTC
if time_str.endswith("Z"):
time_str = time_str[:-1] + "+00:00"
# Replace space with T for ISO compatibility
# Handle "2025-12-28 14:00:00" format
if " " in time_str and "T" not in time_str:
time_str = time_str.replace(" ", "T", 1)
# Try parsing with fromisoformat (handles most ISO 8601 variants)
try:
parsed = datetime.fromisoformat(time_str)
except ValueError:
parsed = None
if parsed is not None:
# Add timezone if missing
if parsed.tzinfo is None:
parsed = dt_util.as_local(parsed)
return parsed
# Try additional formats that fromisoformat might not handle
# Format without seconds: "2025-12-28T14:00"
try:
if len(time_str) == 16 and time_str[10] == "T": # noqa: PLR2004
parsed = datetime.strptime(time_str, "%Y-%m-%dT%H:%M") # noqa: DTZ007
return dt_util.as_local(parsed)
except ValueError:
pass
return None
def parse_hhmm_to_datetime(time_str: str, reference_time: datetime) -> datetime:
"""
Parse HH:MM or HH:MM:SS string to datetime, finding next occurrence from reference.
Seconds are ignored as we round to 15-minute boundaries anyway.
Args:
time_str: Time in HH:MM or HH:MM:SS format
reference_time: Reference datetime (typically now)
Returns:
Timezone-aware datetime for next occurrence of this time
"""
parts = time_str.strip().split(":")
hour = int(parts[0])
minute = int(parts[1])
# Ignore seconds if present (parts[2])
# Create datetime for today with given time
candidate = reference_time.replace(hour=hour, minute=minute, second=0, microsecond=0)
# If candidate is in the past, move to next day
if candidate <= reference_time:
candidate += timedelta(days=1)
return candidate
def parse_window( # noqa: PLR0912, PLR0915
_hass: HomeAssistant,
start_input: str | datetime | None,
end_input: str | datetime | None,
horizon_hours: int = DEFAULT_HORIZON_HOURS,
duration_minutes: int | None = None,
) -> ParsedWindow:
"""
Parse window start/end with automatic format detection.
Supports:
- HH:MM or HH:MM:SS format (e.g., "14:00", "14:00:00") - time-only, next occurrence
- ISO datetime strings with/without timezone (e.g., "2025-12-28T14:00:00+01:00")
- ISO datetime with Z for UTC (e.g., "2025-12-28T14:00:00Z")
- Datetime with space separator (e.g., "2025-12-28 14:00:00")
- With or without microseconds
- datetime objects
Args:
hass: Home Assistant instance
start_input: Start time (HH:MM, ISO string, datetime, or None for now)
end_input: End time (HH:MM, ISO string, datetime, or None for horizon)
horizon_hours: Maximum look-ahead hours (default: 36)
duration_minutes: Optional duration - if provided and > window, window is extended
Returns:
ParsedWindow with start/end datetimes and any warnings/errors
"""
warnings: list[str] = []
errors: list[str] = []
now = dt_util.now()
horizon_end = now + timedelta(hours=horizon_hours)
# Parse start
start: datetime
start_is_hhmm = False
if start_input is None:
start = now
elif isinstance(start_input, datetime):
start = start_input
if start.tzinfo is None:
start = dt_util.as_local(start)
elif isinstance(start_input, str):
if is_hhmm_format(start_input):
start = parse_hhmm_to_datetime(start_input, now)
start_is_hhmm = True
else:
# Try to parse as datetime string (flexible format)
parsed = parse_datetime_string(start_input)
if parsed is not None:
start = parsed
else:
errors.append("invalid_parameters")
return ParsedWindow(start=now, end=horizon_end, errors=errors)
else:
errors.append("invalid_parameters")
return ParsedWindow(start=now, end=horizon_end, errors=errors)
# Parse end
end: datetime
end_is_hhmm = False
if end_input is None:
end = horizon_end
elif isinstance(end_input, datetime):
end = end_input
if end.tzinfo is None:
end = dt_util.as_local(end)
elif isinstance(end_input, str):
if is_hhmm_format(end_input):
end_is_hhmm = True
end_parts = end_input.strip().split(":")
end_hour = int(end_parts[0])
end_minute = int(end_parts[1])
# Create end datetime based on start
end = start.replace(hour=end_hour, minute=end_minute, second=0, microsecond=0)
# Handle midnight wrap logic for HH:MM
start_time_of_day = start.hour * 60 + start.minute
end_time_of_day = end_hour * 60 + end_minute
if end_time_of_day == start_time_of_day:
# Same time -> +24h
end += timedelta(days=1)
elif end_time_of_day < start_time_of_day:
# End is earlier in day -> wrap to next day
end += timedelta(days=1)
# else: end is later same day, keep as-is
else:
# Try to parse as datetime string (flexible format)
parsed = parse_datetime_string(end_input)
if parsed is not None:
end = parsed
else:
errors.append("invalid_parameters")
return ParsedWindow(start=start, end=horizon_end, errors=errors)
else:
errors.append("invalid_parameters")
return ParsedWindow(start=start, end=horizon_end, errors=errors)
# Validate: end < start only allowed for HH:MM (already handled above)
if end <= start and not (start_is_hhmm and end_is_hhmm):
errors.append("window_end_before_start")
return ParsedWindow(start=start, end=end, errors=errors)
# Check if window extends duration (if provided)
if duration_minutes is not None:
window_duration = (end - start).total_seconds() / 60
if duration_minutes > window_duration:
# Extend window to fit duration
extension_minutes = duration_minutes - window_duration
end = end + timedelta(minutes=extension_minutes)
warnings.append("window_extended_for_duration")
# Clamp end to horizon
if end > horizon_end:
end = horizon_end
warnings.append("window_end_clamped_to_horizon")
return ParsedWindow(start=start, end=end, warnings=warnings, errors=errors)
def round_to_quarter(dt: datetime, mode: str = "ceil") -> datetime:
"""
Round datetime to 15-minute boundary.
Args:
dt: Datetime to round
mode: "nearest", "floor", or "ceil"
Returns:
Rounded datetime
"""
minute = dt.minute
quarter = minute // 15
if mode == "floor":
new_minute = quarter * 15
elif mode == "ceil":
new_minute = minute if minute % 15 == 0 else (quarter + 1) * 15
else: # nearest
remainder = minute % 15
new_minute = quarter * 15 if remainder < RESOLUTION_HALF else (quarter + 1) * 15
# Handle hour overflow
hour_add = new_minute // 60
new_minute = new_minute % 60
result = dt.replace(minute=new_minute, second=0, microsecond=0)
if hour_add:
result += timedelta(hours=hour_add)
return result
def format_price(price: float, decimals: int = 4) -> float:
"""Round price to specified decimals."""
return round(price, decimals)
def price_to_subunit(price_eur: float) -> float:
"""Convert price from main unit to subunit (e.g., EUR to ct)."""
return round(price_eur * 100, 2)
def get_intervals_in_window(
all_intervals: list[dict[str, Any]],
window_start: datetime,
window_end: datetime,
) -> list[dict[str, Any]]:
"""
Filter intervals to those within the specified window.
Args:
all_intervals: List of all available price intervals
window_start: Start of window (inclusive)
window_end: End of window (exclusive)
Returns:
List of intervals within window
"""
result = []
for interval in all_intervals:
starts_at = interval.get("startsAt")
if not starts_at:
continue
# Parse interval start time
interval_start = datetime.fromisoformat(starts_at) if isinstance(starts_at, str) else starts_at
# Interval is within window if its start is >= window_start and < window_end
if window_start <= interval_start < window_end:
result.append(interval)
return result
def calculate_intervals_needed(duration_minutes: int, rounding: str = "ceil") -> int:
"""
Calculate number of 15-minute intervals needed for a duration.
Args:
duration_minutes: Duration in minutes
rounding: "nearest", "floor", or "ceil"
Returns:
Number of intervals
"""
exact = duration_minutes / RESOLUTION_MINUTES
if rounding == "floor":
return int(exact)
if rounding == "ceil":
return int(exact) if exact == int(exact) else int(exact) + 1
# nearest
return round(exact)