refactoring

This commit is contained in:
Julian Pawlowski 2025-05-20 20:28:35 +00:00
parent f4a8d68937
commit a375026e07

View file

@ -27,6 +27,182 @@ SERVICE_SCHEMA: Final = vol.Schema(
} }
) )
# region Top-level functions (ordered by call hierarchy)
# --- Entry point: Service handler ---
async def _get_price(call: ServiceCall) -> dict[str, Any]:
"""
Return merged priceInfo and priceRating for the requested day and config entry.
If 'time' is provided, it must be in HH:mm or HH:mm:ss format and is combined with the selected 'day'.
This only affects 'previous', 'current', and 'next' fields, not the 'prices' list.
If 'time' is not provided, the current time is used for all days.
If 'day' is not provided, the prices list will include today and tomorrow, but stats and interval
selection are only for today.
"""
hass = call.hass
entry_id_raw = call.data.get(ATTR_ENTRY_ID)
if entry_id_raw is None:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry_id: str = str(entry_id_raw)
time_value = call.data.get(ATTR_TIME)
explicit_day = ATTR_DAY in call.data
day = call.data.get(ATTR_DAY)
entry, coordinator, data = _get_entry_and_data(hass, entry_id)
price_info_data, price_rating_data, hourly_ratings, rating_threshold_percentages, currency = _extract_price_data(
data
)
price_info_by_day, day_prefixes, ratings_by_day = _prepare_day_structures(price_info_data, hourly_ratings)
(
merged,
stats_merged,
interval_selection_merged,
interval_selection_ratings,
interval_selection_day,
) = _select_merge_strategy(
explicit_day=explicit_day,
day=day if day is not None else "today",
price_info_by_day=price_info_by_day,
ratings_by_day=ratings_by_day,
)
_annotate_intervals_with_times(
merged,
price_info_by_day,
interval_selection_day,
)
price_stats = _get_price_stats(stats_merged)
now, is_simulated = _determine_now_and_simulation(time_value, interval_selection_merged)
ctx = IntervalContext(
merged=interval_selection_merged,
all_ratings=interval_selection_ratings,
coordinator=coordinator,
day=interval_selection_day,
now=now,
is_simulated=is_simulated,
)
previous_interval, current_interval, next_interval = _select_intervals(ctx)
for interval in merged:
if "previous_end_time" in interval:
del interval["previous_end_time"]
response_ctx = PriceResponseContext(
price_stats=price_stats,
previous_interval=previous_interval,
current_interval=current_interval,
next_interval=next_interval,
currency=currency,
rating_threshold_percentages=rating_threshold_percentages,
merged=merged,
)
return _build_price_response(response_ctx)
# --- Direct helpers (called by service handler or each other) ---
def _get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]:
"""Validate entry and extract coordinator and data."""
if not entry_id:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry = next((e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id), None)
if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id")
coordinator = entry.runtime_data.coordinator
data = coordinator.data or {}
return entry, coordinator, data
def _extract_price_data(data: dict) -> tuple[dict, dict, list, Any, Any]:
"""Extract price info and rating data from coordinator data."""
price_info_data = data.get("priceInfo") or {}
price_rating_data = data.get("priceRating") or {}
hourly_ratings = price_rating_data.get("hourly") or []
rating_threshold_percentages = price_rating_data.get("thresholdPercentages")
currency = price_rating_data.get("currency")
return price_info_data, price_rating_data, hourly_ratings, rating_threshold_percentages, currency
def _prepare_day_structures(price_info_data: dict, hourly_ratings: list) -> tuple[dict, dict, dict]:
"""Prepare price info, day prefixes, and ratings by day."""
price_info_by_day = {d: price_info_data.get(d) or [] for d in ("yesterday", "today", "tomorrow")}
day_prefixes = {d: _get_day_prefixes(price_info_by_day[d]) for d in ("yesterday", "today", "tomorrow")}
ratings_by_day = {
d: [
r
for r in hourly_ratings
if day_prefixes[d] and r.get("time", r.get("startsAt", "")).startswith(day_prefixes[d][0])
]
if price_info_by_day[d] and day_prefixes[d]
else []
for d in ("yesterday", "today", "tomorrow")
}
return price_info_by_day, day_prefixes, ratings_by_day
def _select_merge_strategy(
*,
explicit_day: bool,
day: str,
price_info_by_day: dict,
ratings_by_day: dict,
) -> tuple[list, list, list, list, str]:
"""Select merging strategy for intervals and stats."""
if not explicit_day:
merged_today = _merge_priceinfo_and_pricerating(price_info_by_day["today"], ratings_by_day["today"])
merged_tomorrow = _merge_priceinfo_and_pricerating(price_info_by_day["tomorrow"], ratings_by_day["tomorrow"])
merged = merged_today + merged_tomorrow
stats_merged = merged_today
interval_selection_merged = merged_today
interval_selection_ratings = ratings_by_day["today"]
interval_selection_day = "today"
else:
day_key = day if day in ("yesterday", "today", "tomorrow") else "today"
merged = _merge_priceinfo_and_pricerating(price_info_by_day[day_key], ratings_by_day[day_key])
stats_merged = merged
interval_selection_merged = merged
interval_selection_ratings = ratings_by_day[day_key]
interval_selection_day = day_key
return (
merged,
stats_merged,
interval_selection_merged,
interval_selection_ratings,
interval_selection_day,
)
def _get_day_prefixes(day_info: list[dict]) -> list[str]:
"""Return a list of unique day prefixes from the intervals' start datetimes."""
prefixes = set()
for interval in day_info:
dt_str = interval.get("time") or interval.get("startsAt")
if not dt_str:
continue
start_dt = dt_util.parse_datetime(dt_str)
if start_dt:
prefixes.add(start_dt.date().isoformat())
return list(prefixes)
def _get_adjacent_start_time(price_info_by_day: dict, day_key: str, *, first: bool) -> str | None:
"""Get the start_time from the first/last interval of an adjacent day."""
info = price_info_by_day.get(day_key) or []
if not info:
return None
idx = 0 if first else -1
return info[idx].get("startsAt")
def _merge_priceinfo_and_pricerating(price_info: list[dict], price_rating: list[dict]) -> list[dict]: def _merge_priceinfo_and_pricerating(price_info: list[dict], price_rating: list[dict]) -> list[dict]:
""" """
@ -106,27 +282,95 @@ def _find_next_interval(
return None return None
@dataclass def _annotate_intervals_with_times(
class IntervalContext: merged: list[dict],
""" price_info_by_day: dict,
Context for selecting price intervals. day: str,
) -> None:
"""Annotate merged intervals with end_time and previous_end_time."""
for idx, interval in enumerate(merged):
# Default: next interval's start_time
if idx + 1 < len(merged):
interval["end_time"] = merged[idx + 1].get("start_time")
# Last interval: look into tomorrow if today, or None otherwise
elif day == "today":
next_start = _get_adjacent_start_time(price_info_by_day, "tomorrow", first=True)
interval["end_time"] = next_start
elif day == "yesterday":
next_start = _get_adjacent_start_time(price_info_by_day, "today", first=True)
interval["end_time"] = next_start
elif day == "tomorrow":
interval["end_time"] = None
else:
interval["end_time"] = None
# First interval: look into yesterday if today, or None otherwise
if idx == 0:
if day == "today":
prev_end = _get_adjacent_start_time(price_info_by_day, "yesterday", first=False)
interval["previous_end_time"] = prev_end
elif day == "tomorrow":
prev_end = _get_adjacent_start_time(price_info_by_day, "today", first=False)
interval["previous_end_time"] = prev_end
elif day == "yesterday":
interval["previous_end_time"] = None
else:
interval["previous_end_time"] = None
Attributes:
merged: List of merged price and rating intervals for the selected day.
all_ratings: All rating intervals for the selected day.
coordinator: Data update coordinator for the integration.
day: The day being queried ('yesterday', 'today', or 'tomorrow').
now: The datetime used for interval selection.
is_simulated: Whether the time is simulated (from user input) or real.
""" def _get_price_stats(merged: list[dict]) -> PriceStats:
"""Calculate average, min, and max price and their intervals from merged data."""
if merged:
price_sum = sum(float(interval.get("price", 0)) for interval in merged if "price" in interval)
price_avg = round(price_sum / len(merged), 4)
else:
price_avg = 0
price_min, price_min_start_time, price_min_end_time = _get_price_stat(merged, "min")
price_max, price_max_start_time, price_max_end_time = _get_price_stat(merged, "max")
return PriceStats(
price_avg=price_avg,
price_min=price_min,
price_min_start_time=price_min_start_time,
price_min_end_time=price_min_end_time,
price_max=price_max,
price_max_start_time=price_max_start_time,
price_max_end_time=price_max_end_time,
stats_merged=merged,
)
merged: list[dict]
all_ratings: list[dict] def _determine_now_and_simulation(
coordinator: Any time_value: str | None, interval_selection_merged: list[dict]
day: str ) -> tuple[datetime, bool]:
now: datetime """Determine the 'now' datetime and simulation flag."""
is_simulated: bool is_simulated = False
if time_value:
if not interval_selection_merged or not interval_selection_merged[0].get("start_time"):
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="no_data_for_day",
)
day_prefix = interval_selection_merged[0]["start_time"].split("T")[0]
dt_str = f"{day_prefix}T{time_value}"
try:
now = datetime.fromisoformat(dt_str)
except ValueError as exc:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_time",
translation_placeholders={"error": str(exc)},
) from exc
is_simulated = True
elif not interval_selection_merged or not interval_selection_merged[0].get("start_time"):
now = dt_util.now().replace(second=0, microsecond=0)
else:
day_prefix = interval_selection_merged[0]["start_time"].split("T")[0]
current_time = dt_util.now().time().replace(second=0, microsecond=0)
dt_str = f"{day_prefix}T{current_time.isoformat()}"
try:
now = datetime.fromisoformat(dt_str)
except ValueError:
now = dt_util.now().replace(second=0, microsecond=0)
return now, is_simulated
def _select_intervals(ctx: IntervalContext) -> tuple[Any, Any, Any]: def _select_intervals(ctx: IntervalContext) -> tuple[Any, Any, Any]:
@ -176,109 +420,7 @@ def _select_intervals(ctx: IntervalContext) -> tuple[Any, Any, Any]:
return previous_interval, current_interval, next_interval return previous_interval, current_interval, next_interval
def get_adjacent_start_time(price_info_by_day: dict, day_key: str, *, first: bool) -> str | None: # --- Indirect helpers (called by helpers above) ---
"""Get the start_time from the first/last interval of an adjacent day."""
info = price_info_by_day.get(day_key) or []
if not info:
return None
idx = 0 if first else -1
return info[idx].get("startsAt")
def annotate_intervals_with_times(
merged: list[dict],
price_info_by_day: dict,
day: str,
) -> None:
"""Annotate merged intervals with end_time and previous_end_time."""
for idx, interval in enumerate(merged):
# Default: next interval's start_time
if idx + 1 < len(merged):
interval["end_time"] = merged[idx + 1].get("start_time")
# Last interval: look into tomorrow if today, or None otherwise
elif day == "today":
next_start = get_adjacent_start_time(price_info_by_day, "tomorrow", first=True)
interval["end_time"] = next_start
elif day == "yesterday":
next_start = get_adjacent_start_time(price_info_by_day, "today", first=True)
interval["end_time"] = next_start
elif day == "tomorrow":
interval["end_time"] = None
else:
interval["end_time"] = None
# First interval: look into yesterday if today, or None otherwise
if idx == 0:
if day == "today":
prev_end = get_adjacent_start_time(price_info_by_day, "yesterday", first=False)
interval["previous_end_time"] = prev_end
elif day == "tomorrow":
prev_end = get_adjacent_start_time(price_info_by_day, "today", first=False)
interval["previous_end_time"] = prev_end
elif day == "yesterday":
interval["previous_end_time"] = None
else:
interval["previous_end_time"] = None
def get_price_stat(merged: list[dict], stat: str) -> tuple[float, str | None, str | None]:
"""Return min or max price and its start and end time from merged intervals."""
if not merged:
return 0, None, None
values = [float(interval.get("price", 0)) for interval in merged if "price" in interval]
if not values:
return 0, None, None
val = min(values) if stat == "min" else max(values)
start_time = next((interval.get("start_time") for interval in merged if interval.get("price") == val), None)
end_time = next((interval.get("end_time") for interval in merged if interval.get("price") == val), None)
return val, start_time, end_time
def _get_price_stats(merged: list[dict]) -> PriceStats:
"""Calculate average, min, and max price and their intervals from merged data."""
if merged:
price_sum = sum(float(interval.get("price", 0)) for interval in merged if "price" in interval)
price_avg = round(price_sum / len(merged), 4)
else:
price_avg = 0
price_min, price_min_start_time, price_min_end_time = get_price_stat(merged, "min")
price_max, price_max_start_time, price_max_end_time = get_price_stat(merged, "max")
return PriceStats(
price_avg=price_avg,
price_min=price_min,
price_min_start_time=price_min_start_time,
price_min_end_time=price_min_end_time,
price_max=price_max,
price_max_start_time=price_max_start_time,
price_max_end_time=price_max_end_time,
stats_merged=merged,
)
@dataclass
class PriceStats:
"""Encapsulates price statistics and their intervals for the Tibber Prices service."""
price_avg: float
price_min: float
price_min_start_time: str | None
price_min_end_time: str | None
price_max: float
price_max_start_time: str | None
price_max_end_time: str | None
stats_merged: list[dict]
@dataclass
class PriceResponseContext:
"""Context for building the price response."""
price_stats: PriceStats
previous_interval: dict | None
current_interval: dict | None
next_interval: dict | None
currency: str | None
rating_threshold_percentages: Any
merged: list[dict]
def _build_price_response(ctx: PriceResponseContext) -> dict[str, Any]: def _build_price_response(ctx: PriceResponseContext) -> dict[str, Any]:
@ -313,202 +455,77 @@ def _build_price_response(ctx: PriceResponseContext) -> dict[str, Any]:
} }
async def _get_price(call: ServiceCall) -> dict[str, Any]: def _get_price_stat(merged: list[dict], stat: str) -> tuple[float, str | None, str | None]:
"""Return min or max price and its start and end time from merged intervals."""
if not merged:
return 0, None, None
values = [float(interval.get("price", 0)) for interval in merged if "price" in interval]
if not values:
return 0, None, None
val = min(values) if stat == "min" else max(values)
start_time = next((interval.get("start_time") for interval in merged if interval.get("price") == val), None)
end_time = next((interval.get("end_time") for interval in merged if interval.get("price") == val), None)
return val, start_time, end_time
# endregion
# region Main classes (dataclasses)
@dataclass
class IntervalContext:
""" """
Return merged priceInfo and priceRating for the requested day and config entry. Context for selecting price intervals.
Attributes:
merged: List of merged price and rating intervals for the selected day.
all_ratings: All rating intervals for the selected day.
coordinator: Data update coordinator for the integration.
day: The day being queried ('yesterday', 'today', or 'tomorrow').
now: The datetime used for interval selection.
is_simulated: Whether the time is simulated (from user input) or real.
If 'time' is provided, it must be in HH:mm or HH:mm:ss format and is combined with the selected 'day'.
This only affects 'previous', 'current', and 'next' fields, not the 'prices' list.
If 'time' is not provided, the current time is used for all days.
If 'day' is not provided, the prices list will include today and tomorrow, but stats and interval
selection are only for today.
""" """
hass = call.hass
entry_id_raw = call.data.get(ATTR_ENTRY_ID)
if entry_id_raw is None:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry_id: str = str(entry_id_raw)
time_value = call.data.get(ATTR_TIME)
explicit_day = ATTR_DAY in call.data
day = call.data.get(ATTR_DAY)
entry, coordinator, data = _get_entry_and_data(hass, entry_id) merged: list[dict]
price_info_data, price_rating_data, hourly_ratings, rating_threshold_percentages, currency = _extract_price_data( all_ratings: list[dict]
data coordinator: Any
) day: str
now: datetime
price_info_by_day, day_prefixes, ratings_by_day = _prepare_day_structures(price_info_data, hourly_ratings) is_simulated: bool
(
merged,
stats_merged,
interval_selection_merged,
interval_selection_ratings,
interval_selection_day,
) = _select_merge_strategy(
explicit_day=explicit_day,
day=day if day is not None else "today",
price_info_by_day=price_info_by_day,
ratings_by_day=ratings_by_day,
)
annotate_intervals_with_times(
merged,
price_info_by_day,
interval_selection_day,
)
price_stats = _get_price_stats(stats_merged)
now, is_simulated = _determine_now_and_simulation(time_value, interval_selection_merged)
ctx = IntervalContext(
merged=interval_selection_merged,
all_ratings=interval_selection_ratings,
coordinator=coordinator,
day=interval_selection_day,
now=now,
is_simulated=is_simulated,
)
previous_interval, current_interval, next_interval = _select_intervals(ctx)
for interval in merged:
if "previous_end_time" in interval:
del interval["previous_end_time"]
response_ctx = PriceResponseContext(
price_stats=price_stats,
previous_interval=previous_interval,
current_interval=current_interval,
next_interval=next_interval,
currency=currency,
rating_threshold_percentages=rating_threshold_percentages,
merged=merged,
)
return _build_price_response(response_ctx)
def _get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]: @dataclass
"""Validate entry and extract coordinator and data.""" class PriceStats:
if not entry_id: """Encapsulates price statistics and their intervals for the Tibber Prices service."""
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry = next((e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id), None) price_avg: float
if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data: price_min: float
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id") price_min_start_time: str | None
coordinator = entry.runtime_data.coordinator price_min_end_time: str | None
data = coordinator.data or {} price_max: float
return entry, coordinator, data price_max_start_time: str | None
price_max_end_time: str | None
stats_merged: list[dict]
def _extract_price_data(data: dict) -> tuple[dict, dict, list, Any, Any]: @dataclass
"""Extract price info and rating data from coordinator data.""" class PriceResponseContext:
price_info_data = data.get("priceInfo") or {} """Context for building the price response."""
price_rating_data = data.get("priceRating") or {}
hourly_ratings = price_rating_data.get("hourly") or [] price_stats: PriceStats
rating_threshold_percentages = price_rating_data.get("thresholdPercentages") previous_interval: dict | None
currency = price_rating_data.get("currency") current_interval: dict | None
return price_info_data, price_rating_data, hourly_ratings, rating_threshold_percentages, currency next_interval: dict | None
currency: str | None
rating_threshold_percentages: Any
merged: list[dict]
def _prepare_day_structures(price_info_data: dict, hourly_ratings: list) -> tuple[dict, dict, dict]: # endregion
"""Prepare price info, day prefixes, and ratings by day."""
price_info_by_day = {d: price_info_data.get(d) or [] for d in ("yesterday", "today", "tomorrow")}
day_prefixes = {d: _get_day_prefixes(price_info_by_day[d]) for d in ("yesterday", "today", "tomorrow")}
ratings_by_day = {
d: [
r
for r in hourly_ratings
if day_prefixes[d] and r.get("time", r.get("startsAt", "")).startswith(day_prefixes[d][0])
]
if price_info_by_day[d] and day_prefixes[d]
else []
for d in ("yesterday", "today", "tomorrow")
}
return price_info_by_day, day_prefixes, ratings_by_day
# region Service registration
def _select_merge_strategy(
*,
explicit_day: bool,
day: str,
price_info_by_day: dict,
ratings_by_day: dict,
) -> tuple[list, list, list, list, str]:
"""Select merging strategy for intervals and stats."""
if not explicit_day:
merged_today = _merge_priceinfo_and_pricerating(price_info_by_day["today"], ratings_by_day["today"])
merged_tomorrow = _merge_priceinfo_and_pricerating(price_info_by_day["tomorrow"], ratings_by_day["tomorrow"])
merged = merged_today + merged_tomorrow
stats_merged = merged_today
interval_selection_merged = merged_today
interval_selection_ratings = ratings_by_day["today"]
interval_selection_day = "today"
else:
day_key = day if day in ("yesterday", "today", "tomorrow") else "today"
merged = _merge_priceinfo_and_pricerating(price_info_by_day[day_key], ratings_by_day[day_key])
stats_merged = merged
interval_selection_merged = merged
interval_selection_ratings = ratings_by_day[day_key]
interval_selection_day = day_key
return (
merged,
stats_merged,
interval_selection_merged,
interval_selection_ratings,
interval_selection_day,
)
def _determine_now_and_simulation(
time_value: str | None, interval_selection_merged: list[dict]
) -> tuple[datetime, bool]:
"""Determine the 'now' datetime and simulation flag."""
is_simulated = False
if time_value:
if not interval_selection_merged or not interval_selection_merged[0].get("start_time"):
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="no_data_for_day",
)
day_prefix = interval_selection_merged[0]["start_time"].split("T")[0]
dt_str = f"{day_prefix}T{time_value}"
try:
now = datetime.fromisoformat(dt_str)
except ValueError as exc:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_time",
translation_placeholders={"error": str(exc)},
) from exc
is_simulated = True
elif not interval_selection_merged or not interval_selection_merged[0].get("start_time"):
now = dt_util.now().replace(second=0, microsecond=0)
else:
day_prefix = interval_selection_merged[0]["start_time"].split("T")[0]
current_time = dt_util.now().time().replace(second=0, microsecond=0)
dt_str = f"{day_prefix}T{current_time.isoformat()}"
try:
now = datetime.fromisoformat(dt_str)
except ValueError:
now = dt_util.now().replace(second=0, microsecond=0)
return now, is_simulated
DAY_PREFIX_LENGTH = 10
def _get_day_prefixes(day_info: list[dict]) -> list[str]:
"""Return a list of unique day prefixes from the intervals' start datetimes."""
prefixes = set()
for interval in day_info:
dt_str = interval.get("time") or interval.get("startsAt")
if not dt_str:
continue
start_dt = dt_util.parse_datetime(dt_str)
if start_dt:
prefixes.add(start_dt.date().isoformat())
return list(prefixes)
@callback @callback
@ -521,3 +538,6 @@ def async_setup_services(hass: HomeAssistant) -> None:
schema=SERVICE_SCHEMA, schema=SERVICE_SCHEMA,
supports_response=SupportsResponse.ONLY, supports_response=SupportsResponse.ONLY,
) )
# endregion