diff --git a/custom_components/tibber_prices/services.py b/custom_components/tibber_prices/services.py index 569593d..4893f81 100644 --- a/custom_components/tibber_prices/services.py +++ b/custom_components/tibber_prices/services.py @@ -27,6 +27,182 @@ SERVICE_SCHEMA: Final = vol.Schema( } ) +# region Top-level functions (ordered by call hierarchy) + +# --- Entry point: Service handler --- + + +async def _get_price(call: ServiceCall) -> dict[str, Any]: + """ + Return merged priceInfo and priceRating for the requested day and config entry. + + If 'time' is provided, it must be in HH:mm or HH:mm:ss format and is combined with the selected 'day'. + This only affects 'previous', 'current', and 'next' fields, not the 'prices' list. + If 'time' is not provided, the current time is used for all days. + If 'day' is not provided, the prices list will include today and tomorrow, but stats and interval + selection are only for today. + """ + hass = call.hass + entry_id_raw = call.data.get(ATTR_ENTRY_ID) + if entry_id_raw is None: + raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") + entry_id: str = str(entry_id_raw) + time_value = call.data.get(ATTR_TIME) + explicit_day = ATTR_DAY in call.data + day = call.data.get(ATTR_DAY) + + entry, coordinator, data = _get_entry_and_data(hass, entry_id) + price_info_data, price_rating_data, hourly_ratings, rating_threshold_percentages, currency = _extract_price_data( + data + ) + + price_info_by_day, day_prefixes, ratings_by_day = _prepare_day_structures(price_info_data, hourly_ratings) + + ( + merged, + stats_merged, + interval_selection_merged, + interval_selection_ratings, + interval_selection_day, + ) = _select_merge_strategy( + explicit_day=explicit_day, + day=day if day is not None else "today", + price_info_by_day=price_info_by_day, + ratings_by_day=ratings_by_day, + ) + + _annotate_intervals_with_times( + merged, + price_info_by_day, + interval_selection_day, + ) + + price_stats = _get_price_stats(stats_merged) + + now, is_simulated = _determine_now_and_simulation(time_value, interval_selection_merged) + + ctx = IntervalContext( + merged=interval_selection_merged, + all_ratings=interval_selection_ratings, + coordinator=coordinator, + day=interval_selection_day, + now=now, + is_simulated=is_simulated, + ) + previous_interval, current_interval, next_interval = _select_intervals(ctx) + + for interval in merged: + if "previous_end_time" in interval: + del interval["previous_end_time"] + + response_ctx = PriceResponseContext( + price_stats=price_stats, + previous_interval=previous_interval, + current_interval=current_interval, + next_interval=next_interval, + currency=currency, + rating_threshold_percentages=rating_threshold_percentages, + merged=merged, + ) + + return _build_price_response(response_ctx) + + +# --- Direct helpers (called by service handler or each other) --- + + +def _get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]: + """Validate entry and extract coordinator and data.""" + if not entry_id: + raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") + entry = next((e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id), None) + if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data: + raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id") + coordinator = entry.runtime_data.coordinator + data = coordinator.data or {} + return entry, coordinator, data + + +def _extract_price_data(data: dict) -> tuple[dict, dict, list, Any, Any]: + """Extract price info and rating data from coordinator data.""" + price_info_data = data.get("priceInfo") or {} + price_rating_data = data.get("priceRating") or {} + hourly_ratings = price_rating_data.get("hourly") or [] + rating_threshold_percentages = price_rating_data.get("thresholdPercentages") + currency = price_rating_data.get("currency") + return price_info_data, price_rating_data, hourly_ratings, rating_threshold_percentages, currency + + +def _prepare_day_structures(price_info_data: dict, hourly_ratings: list) -> tuple[dict, dict, dict]: + """Prepare price info, day prefixes, and ratings by day.""" + price_info_by_day = {d: price_info_data.get(d) or [] for d in ("yesterday", "today", "tomorrow")} + day_prefixes = {d: _get_day_prefixes(price_info_by_day[d]) for d in ("yesterday", "today", "tomorrow")} + ratings_by_day = { + d: [ + r + for r in hourly_ratings + if day_prefixes[d] and r.get("time", r.get("startsAt", "")).startswith(day_prefixes[d][0]) + ] + if price_info_by_day[d] and day_prefixes[d] + else [] + for d in ("yesterday", "today", "tomorrow") + } + return price_info_by_day, day_prefixes, ratings_by_day + + +def _select_merge_strategy( + *, + explicit_day: bool, + day: str, + price_info_by_day: dict, + ratings_by_day: dict, +) -> tuple[list, list, list, list, str]: + """Select merging strategy for intervals and stats.""" + if not explicit_day: + merged_today = _merge_priceinfo_and_pricerating(price_info_by_day["today"], ratings_by_day["today"]) + merged_tomorrow = _merge_priceinfo_and_pricerating(price_info_by_day["tomorrow"], ratings_by_day["tomorrow"]) + merged = merged_today + merged_tomorrow + stats_merged = merged_today + interval_selection_merged = merged_today + interval_selection_ratings = ratings_by_day["today"] + interval_selection_day = "today" + else: + day_key = day if day in ("yesterday", "today", "tomorrow") else "today" + merged = _merge_priceinfo_and_pricerating(price_info_by_day[day_key], ratings_by_day[day_key]) + stats_merged = merged + interval_selection_merged = merged + interval_selection_ratings = ratings_by_day[day_key] + interval_selection_day = day_key + return ( + merged, + stats_merged, + interval_selection_merged, + interval_selection_ratings, + interval_selection_day, + ) + + +def _get_day_prefixes(day_info: list[dict]) -> list[str]: + """Return a list of unique day prefixes from the intervals' start datetimes.""" + prefixes = set() + for interval in day_info: + dt_str = interval.get("time") or interval.get("startsAt") + if not dt_str: + continue + start_dt = dt_util.parse_datetime(dt_str) + if start_dt: + prefixes.add(start_dt.date().isoformat()) + return list(prefixes) + + +def _get_adjacent_start_time(price_info_by_day: dict, day_key: str, *, first: bool) -> str | None: + """Get the start_time from the first/last interval of an adjacent day.""" + info = price_info_by_day.get(day_key) or [] + if not info: + return None + idx = 0 if first else -1 + return info[idx].get("startsAt") + def _merge_priceinfo_and_pricerating(price_info: list[dict], price_rating: list[dict]) -> list[dict]: """ @@ -106,27 +282,95 @@ def _find_next_interval( return None -@dataclass -class IntervalContext: - """ - Context for selecting price intervals. +def _annotate_intervals_with_times( + merged: list[dict], + price_info_by_day: dict, + day: str, +) -> None: + """Annotate merged intervals with end_time and previous_end_time.""" + for idx, interval in enumerate(merged): + # Default: next interval's start_time + if idx + 1 < len(merged): + interval["end_time"] = merged[idx + 1].get("start_time") + # Last interval: look into tomorrow if today, or None otherwise + elif day == "today": + next_start = _get_adjacent_start_time(price_info_by_day, "tomorrow", first=True) + interval["end_time"] = next_start + elif day == "yesterday": + next_start = _get_adjacent_start_time(price_info_by_day, "today", first=True) + interval["end_time"] = next_start + elif day == "tomorrow": + interval["end_time"] = None + else: + interval["end_time"] = None + # First interval: look into yesterday if today, or None otherwise + if idx == 0: + if day == "today": + prev_end = _get_adjacent_start_time(price_info_by_day, "yesterday", first=False) + interval["previous_end_time"] = prev_end + elif day == "tomorrow": + prev_end = _get_adjacent_start_time(price_info_by_day, "today", first=False) + interval["previous_end_time"] = prev_end + elif day == "yesterday": + interval["previous_end_time"] = None + else: + interval["previous_end_time"] = None - Attributes: - merged: List of merged price and rating intervals for the selected day. - all_ratings: All rating intervals for the selected day. - coordinator: Data update coordinator for the integration. - day: The day being queried ('yesterday', 'today', or 'tomorrow'). - now: The datetime used for interval selection. - is_simulated: Whether the time is simulated (from user input) or real. - """ +def _get_price_stats(merged: list[dict]) -> PriceStats: + """Calculate average, min, and max price and their intervals from merged data.""" + if merged: + price_sum = sum(float(interval.get("price", 0)) for interval in merged if "price" in interval) + price_avg = round(price_sum / len(merged), 4) + else: + price_avg = 0 + price_min, price_min_start_time, price_min_end_time = _get_price_stat(merged, "min") + price_max, price_max_start_time, price_max_end_time = _get_price_stat(merged, "max") + return PriceStats( + price_avg=price_avg, + price_min=price_min, + price_min_start_time=price_min_start_time, + price_min_end_time=price_min_end_time, + price_max=price_max, + price_max_start_time=price_max_start_time, + price_max_end_time=price_max_end_time, + stats_merged=merged, + ) - merged: list[dict] - all_ratings: list[dict] - coordinator: Any - day: str - now: datetime - is_simulated: bool + +def _determine_now_and_simulation( + time_value: str | None, interval_selection_merged: list[dict] +) -> tuple[datetime, bool]: + """Determine the 'now' datetime and simulation flag.""" + is_simulated = False + if time_value: + if not interval_selection_merged or not interval_selection_merged[0].get("start_time"): + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="no_data_for_day", + ) + day_prefix = interval_selection_merged[0]["start_time"].split("T")[0] + dt_str = f"{day_prefix}T{time_value}" + try: + now = datetime.fromisoformat(dt_str) + except ValueError as exc: + raise ServiceValidationError( + translation_domain=DOMAIN, + translation_key="invalid_time", + translation_placeholders={"error": str(exc)}, + ) from exc + is_simulated = True + elif not interval_selection_merged or not interval_selection_merged[0].get("start_time"): + now = dt_util.now().replace(second=0, microsecond=0) + else: + day_prefix = interval_selection_merged[0]["start_time"].split("T")[0] + current_time = dt_util.now().time().replace(second=0, microsecond=0) + dt_str = f"{day_prefix}T{current_time.isoformat()}" + try: + now = datetime.fromisoformat(dt_str) + except ValueError: + now = dt_util.now().replace(second=0, microsecond=0) + return now, is_simulated def _select_intervals(ctx: IntervalContext) -> tuple[Any, Any, Any]: @@ -176,109 +420,7 @@ def _select_intervals(ctx: IntervalContext) -> tuple[Any, Any, Any]: return previous_interval, current_interval, next_interval -def get_adjacent_start_time(price_info_by_day: dict, day_key: str, *, first: bool) -> str | None: - """Get the start_time from the first/last interval of an adjacent day.""" - info = price_info_by_day.get(day_key) or [] - if not info: - return None - idx = 0 if first else -1 - return info[idx].get("startsAt") - - -def annotate_intervals_with_times( - merged: list[dict], - price_info_by_day: dict, - day: str, -) -> None: - """Annotate merged intervals with end_time and previous_end_time.""" - for idx, interval in enumerate(merged): - # Default: next interval's start_time - if idx + 1 < len(merged): - interval["end_time"] = merged[idx + 1].get("start_time") - # Last interval: look into tomorrow if today, or None otherwise - elif day == "today": - next_start = get_adjacent_start_time(price_info_by_day, "tomorrow", first=True) - interval["end_time"] = next_start - elif day == "yesterday": - next_start = get_adjacent_start_time(price_info_by_day, "today", first=True) - interval["end_time"] = next_start - elif day == "tomorrow": - interval["end_time"] = None - else: - interval["end_time"] = None - # First interval: look into yesterday if today, or None otherwise - if idx == 0: - if day == "today": - prev_end = get_adjacent_start_time(price_info_by_day, "yesterday", first=False) - interval["previous_end_time"] = prev_end - elif day == "tomorrow": - prev_end = get_adjacent_start_time(price_info_by_day, "today", first=False) - interval["previous_end_time"] = prev_end - elif day == "yesterday": - interval["previous_end_time"] = None - else: - interval["previous_end_time"] = None - - -def get_price_stat(merged: list[dict], stat: str) -> tuple[float, str | None, str | None]: - """Return min or max price and its start and end time from merged intervals.""" - if not merged: - return 0, None, None - values = [float(interval.get("price", 0)) for interval in merged if "price" in interval] - if not values: - return 0, None, None - val = min(values) if stat == "min" else max(values) - start_time = next((interval.get("start_time") for interval in merged if interval.get("price") == val), None) - end_time = next((interval.get("end_time") for interval in merged if interval.get("price") == val), None) - return val, start_time, end_time - - -def _get_price_stats(merged: list[dict]) -> PriceStats: - """Calculate average, min, and max price and their intervals from merged data.""" - if merged: - price_sum = sum(float(interval.get("price", 0)) for interval in merged if "price" in interval) - price_avg = round(price_sum / len(merged), 4) - else: - price_avg = 0 - price_min, price_min_start_time, price_min_end_time = get_price_stat(merged, "min") - price_max, price_max_start_time, price_max_end_time = get_price_stat(merged, "max") - return PriceStats( - price_avg=price_avg, - price_min=price_min, - price_min_start_time=price_min_start_time, - price_min_end_time=price_min_end_time, - price_max=price_max, - price_max_start_time=price_max_start_time, - price_max_end_time=price_max_end_time, - stats_merged=merged, - ) - - -@dataclass -class PriceStats: - """Encapsulates price statistics and their intervals for the Tibber Prices service.""" - - price_avg: float - price_min: float - price_min_start_time: str | None - price_min_end_time: str | None - price_max: float - price_max_start_time: str | None - price_max_end_time: str | None - stats_merged: list[dict] - - -@dataclass -class PriceResponseContext: - """Context for building the price response.""" - - price_stats: PriceStats - previous_interval: dict | None - current_interval: dict | None - next_interval: dict | None - currency: str | None - rating_threshold_percentages: Any - merged: list[dict] +# --- Indirect helpers (called by helpers above) --- def _build_price_response(ctx: PriceResponseContext) -> dict[str, Any]: @@ -313,202 +455,77 @@ def _build_price_response(ctx: PriceResponseContext) -> dict[str, Any]: } -async def _get_price(call: ServiceCall) -> dict[str, Any]: +def _get_price_stat(merged: list[dict], stat: str) -> tuple[float, str | None, str | None]: + """Return min or max price and its start and end time from merged intervals.""" + if not merged: + return 0, None, None + values = [float(interval.get("price", 0)) for interval in merged if "price" in interval] + if not values: + return 0, None, None + val = min(values) if stat == "min" else max(values) + start_time = next((interval.get("start_time") for interval in merged if interval.get("price") == val), None) + end_time = next((interval.get("end_time") for interval in merged if interval.get("price") == val), None) + return val, start_time, end_time + + +# endregion + +# region Main classes (dataclasses) + + +@dataclass +class IntervalContext: """ - Return merged priceInfo and priceRating for the requested day and config entry. + Context for selecting price intervals. + + Attributes: + merged: List of merged price and rating intervals for the selected day. + all_ratings: All rating intervals for the selected day. + coordinator: Data update coordinator for the integration. + day: The day being queried ('yesterday', 'today', or 'tomorrow'). + now: The datetime used for interval selection. + is_simulated: Whether the time is simulated (from user input) or real. - If 'time' is provided, it must be in HH:mm or HH:mm:ss format and is combined with the selected 'day'. - This only affects 'previous', 'current', and 'next' fields, not the 'prices' list. - If 'time' is not provided, the current time is used for all days. - If 'day' is not provided, the prices list will include today and tomorrow, but stats and interval - selection are only for today. """ - hass = call.hass - entry_id_raw = call.data.get(ATTR_ENTRY_ID) - if entry_id_raw is None: - raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") - entry_id: str = str(entry_id_raw) - time_value = call.data.get(ATTR_TIME) - explicit_day = ATTR_DAY in call.data - day = call.data.get(ATTR_DAY) - entry, coordinator, data = _get_entry_and_data(hass, entry_id) - price_info_data, price_rating_data, hourly_ratings, rating_threshold_percentages, currency = _extract_price_data( - data - ) - - price_info_by_day, day_prefixes, ratings_by_day = _prepare_day_structures(price_info_data, hourly_ratings) - - ( - merged, - stats_merged, - interval_selection_merged, - interval_selection_ratings, - interval_selection_day, - ) = _select_merge_strategy( - explicit_day=explicit_day, - day=day if day is not None else "today", - price_info_by_day=price_info_by_day, - ratings_by_day=ratings_by_day, - ) - - annotate_intervals_with_times( - merged, - price_info_by_day, - interval_selection_day, - ) - - price_stats = _get_price_stats(stats_merged) - - now, is_simulated = _determine_now_and_simulation(time_value, interval_selection_merged) - - ctx = IntervalContext( - merged=interval_selection_merged, - all_ratings=interval_selection_ratings, - coordinator=coordinator, - day=interval_selection_day, - now=now, - is_simulated=is_simulated, - ) - previous_interval, current_interval, next_interval = _select_intervals(ctx) - - for interval in merged: - if "previous_end_time" in interval: - del interval["previous_end_time"] - - response_ctx = PriceResponseContext( - price_stats=price_stats, - previous_interval=previous_interval, - current_interval=current_interval, - next_interval=next_interval, - currency=currency, - rating_threshold_percentages=rating_threshold_percentages, - merged=merged, - ) - - return _build_price_response(response_ctx) + merged: list[dict] + all_ratings: list[dict] + coordinator: Any + day: str + now: datetime + is_simulated: bool -def _get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]: - """Validate entry and extract coordinator and data.""" - if not entry_id: - raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") - entry = next((e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id), None) - if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data: - raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id") - coordinator = entry.runtime_data.coordinator - data = coordinator.data or {} - return entry, coordinator, data +@dataclass +class PriceStats: + """Encapsulates price statistics and their intervals for the Tibber Prices service.""" + + price_avg: float + price_min: float + price_min_start_time: str | None + price_min_end_time: str | None + price_max: float + price_max_start_time: str | None + price_max_end_time: str | None + stats_merged: list[dict] -def _extract_price_data(data: dict) -> tuple[dict, dict, list, Any, Any]: - """Extract price info and rating data from coordinator data.""" - price_info_data = data.get("priceInfo") or {} - price_rating_data = data.get("priceRating") or {} - hourly_ratings = price_rating_data.get("hourly") or [] - rating_threshold_percentages = price_rating_data.get("thresholdPercentages") - currency = price_rating_data.get("currency") - return price_info_data, price_rating_data, hourly_ratings, rating_threshold_percentages, currency +@dataclass +class PriceResponseContext: + """Context for building the price response.""" + + price_stats: PriceStats + previous_interval: dict | None + current_interval: dict | None + next_interval: dict | None + currency: str | None + rating_threshold_percentages: Any + merged: list[dict] -def _prepare_day_structures(price_info_data: dict, hourly_ratings: list) -> tuple[dict, dict, dict]: - """Prepare price info, day prefixes, and ratings by day.""" - price_info_by_day = {d: price_info_data.get(d) or [] for d in ("yesterday", "today", "tomorrow")} - day_prefixes = {d: _get_day_prefixes(price_info_by_day[d]) for d in ("yesterday", "today", "tomorrow")} - ratings_by_day = { - d: [ - r - for r in hourly_ratings - if day_prefixes[d] and r.get("time", r.get("startsAt", "")).startswith(day_prefixes[d][0]) - ] - if price_info_by_day[d] and day_prefixes[d] - else [] - for d in ("yesterday", "today", "tomorrow") - } - return price_info_by_day, day_prefixes, ratings_by_day +# endregion - -def _select_merge_strategy( - *, - explicit_day: bool, - day: str, - price_info_by_day: dict, - ratings_by_day: dict, -) -> tuple[list, list, list, list, str]: - """Select merging strategy for intervals and stats.""" - if not explicit_day: - merged_today = _merge_priceinfo_and_pricerating(price_info_by_day["today"], ratings_by_day["today"]) - merged_tomorrow = _merge_priceinfo_and_pricerating(price_info_by_day["tomorrow"], ratings_by_day["tomorrow"]) - merged = merged_today + merged_tomorrow - stats_merged = merged_today - interval_selection_merged = merged_today - interval_selection_ratings = ratings_by_day["today"] - interval_selection_day = "today" - else: - day_key = day if day in ("yesterday", "today", "tomorrow") else "today" - merged = _merge_priceinfo_and_pricerating(price_info_by_day[day_key], ratings_by_day[day_key]) - stats_merged = merged - interval_selection_merged = merged - interval_selection_ratings = ratings_by_day[day_key] - interval_selection_day = day_key - return ( - merged, - stats_merged, - interval_selection_merged, - interval_selection_ratings, - interval_selection_day, - ) - - -def _determine_now_and_simulation( - time_value: str | None, interval_selection_merged: list[dict] -) -> tuple[datetime, bool]: - """Determine the 'now' datetime and simulation flag.""" - is_simulated = False - if time_value: - if not interval_selection_merged or not interval_selection_merged[0].get("start_time"): - raise ServiceValidationError( - translation_domain=DOMAIN, - translation_key="no_data_for_day", - ) - day_prefix = interval_selection_merged[0]["start_time"].split("T")[0] - dt_str = f"{day_prefix}T{time_value}" - try: - now = datetime.fromisoformat(dt_str) - except ValueError as exc: - raise ServiceValidationError( - translation_domain=DOMAIN, - translation_key="invalid_time", - translation_placeholders={"error": str(exc)}, - ) from exc - is_simulated = True - elif not interval_selection_merged or not interval_selection_merged[0].get("start_time"): - now = dt_util.now().replace(second=0, microsecond=0) - else: - day_prefix = interval_selection_merged[0]["start_time"].split("T")[0] - current_time = dt_util.now().time().replace(second=0, microsecond=0) - dt_str = f"{day_prefix}T{current_time.isoformat()}" - try: - now = datetime.fromisoformat(dt_str) - except ValueError: - now = dt_util.now().replace(second=0, microsecond=0) - return now, is_simulated - - -DAY_PREFIX_LENGTH = 10 - - -def _get_day_prefixes(day_info: list[dict]) -> list[str]: - """Return a list of unique day prefixes from the intervals' start datetimes.""" - prefixes = set() - for interval in day_info: - dt_str = interval.get("time") or interval.get("startsAt") - if not dt_str: - continue - start_dt = dt_util.parse_datetime(dt_str) - if start_dt: - prefixes.add(start_dt.date().isoformat()) - return list(prefixes) +# region Service registration @callback @@ -521,3 +538,6 @@ def async_setup_services(hass: HomeAssistant) -> None: schema=SERVICE_SCHEMA, supports_response=SupportsResponse.ONLY, ) + + +# endregion