This commit is contained in:
Julian Pawlowski 2025-05-20 17:57:49 +00:00
parent 3d37ace85e
commit 76d2e2bb2b
5 changed files with 427 additions and 6 deletions

View file

@ -19,6 +19,7 @@ from .api import TibberPricesApiClient
from .const import DOMAIN, LOGGER, SCAN_INTERVAL, async_load_translations
from .coordinator import STORAGE_VERSION, TibberPricesDataUpdateCoordinator
from .data import TibberPricesData
from .services import async_setup_services
if TYPE_CHECKING:
from homeassistant.core import HomeAssistant
@ -44,6 +45,9 @@ async def async_setup_entry(
if hass.config.language and hass.config.language != "en":
await async_load_translations(hass, hass.config.language)
# Register services when a config entry is loaded
async_setup_services(hass)
# Use the defined SCAN_INTERVAL constant for consistent polling
coordinator = TibberPricesDataUpdateCoordinator(
hass=hass,
@ -80,6 +84,12 @@ async def async_unload_entry(
if unload_ok and entry.runtime_data is not None:
await entry.runtime_data.coordinator.async_shutdown()
# Unregister services if this was the last config entry
if not hass.config_entries.async_entries(DOMAIN):
for service in ("get_priceinfo", "get_pricerating"):
if hass.services.has_service(DOMAIN, service):
hass.services.async_remove(DOMAIN, service)
return unload_ok

View file

@ -280,13 +280,25 @@ def _flatten_price_info(subscription: dict) -> dict:
def _flatten_price_rating(subscription: dict) -> dict:
"""Extract and flatten priceRating from subscription."""
"""Extract and flatten priceRating from subscription, including currency."""
price_rating = subscription.get("priceRating", {})
def extract_entries_and_currency(rating: dict) -> tuple[list, str | None]:
if rating is None:
return [], None
return rating.get("entries", []), rating.get("currency")
hourly_entries, hourly_currency = extract_entries_and_currency(price_rating.get("hourly"))
daily_entries, daily_currency = extract_entries_and_currency(price_rating.get("daily"))
monthly_entries, monthly_currency = extract_entries_and_currency(price_rating.get("monthly"))
# Prefer hourly, then daily, then monthly for top-level currency
currency = hourly_currency or daily_currency or monthly_currency
return {
"hourly": price_rating.get("hourly", {}).get("entries", []),
"daily": price_rating.get("daily", {}).get("entries", []),
"monthly": price_rating.get("monthly", {}).get("entries", []),
"hourly": hourly_entries,
"daily": daily_entries,
"monthly": monthly_entries,
"thresholdPercentages": price_rating.get("thresholdPercentages"),
"currency": currency,
}
@ -380,6 +392,7 @@ class TibberPricesApiClient:
"priceRating": {
"daily": _flatten_price_rating(subscription)["daily"],
"thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"],
"currency": _flatten_price_rating(subscription)["currency"],
}
}
@ -406,6 +419,7 @@ class TibberPricesApiClient:
"priceRating": {
"hourly": _flatten_price_rating(subscription)["hourly"],
"thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"],
"currency": _flatten_price_rating(subscription)["currency"],
}
}
@ -432,6 +446,7 @@ class TibberPricesApiClient:
"priceRating": {
"monthly": _flatten_price_rating(subscription)["monthly"],
"thresholdPercentages": _flatten_price_rating(subscription)["thresholdPercentages"],
"currency": _flatten_price_rating(subscription)["currency"],
}
}

View file

@ -566,12 +566,13 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
price_rating = data.get("priceRating", data)
threshold = price_rating.get("thresholdPercentages")
entries = price_rating.get(rating_type, [])
currency = price_rating.get("currency")
except KeyError as ex:
LOGGER.error("Failed to extract rating data (flat format): %s", ex)
raise TibberPricesApiClientError(
TibberPricesApiClientError.EMPTY_DATA_ERROR.format(query_type=rating_type)
) from ex
return {"priceRating": {rating_type: entries, "thresholdPercentages": threshold}}
return {"priceRating": {rating_type: entries, "thresholdPercentages": threshold, "currency": currency}}
@callback
def _merge_all_cached_data(self) -> dict:
@ -583,7 +584,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
merged = {"priceInfo": self._cached_price_data["priceInfo"]}
else:
merged = {"priceInfo": self._cached_price_data}
price_rating = {"hourly": [], "daily": [], "monthly": [], "thresholdPercentages": None}
price_rating = {"hourly": [], "daily": [], "monthly": [], "thresholdPercentages": None, "currency": None}
for rating_type, cached in zip(
["hourly", "daily", "monthly"],
[self._cached_rating_data_hourly, self._cached_rating_data_daily, self._cached_rating_data_monthly],
@ -594,7 +595,10 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict]):
price_rating[rating_type] = entries
if not price_rating["thresholdPercentages"]:
price_rating["thresholdPercentages"] = cached["priceRating"].get("thresholdPercentages")
if not price_rating["currency"]:
price_rating["currency"] = cached["priceRating"].get("currency")
merged["priceRating"] = price_rating
merged["currency"] = price_rating["currency"]
return merged
async def _async_initialize(self) -> None:

View file

@ -0,0 +1,360 @@
"""Services for Tibber Prices integration."""
from __future__ import annotations
from datetime import datetime
from typing import Any, Final
import voluptuous as vol
from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse, callback
from homeassistant.exceptions import ServiceValidationError
from homeassistant.util import dt as dt_util
from .const import DOMAIN
PRICE_SERVICE_NAME = "get_price"
ATTR_DAY: Final = "day"
ATTR_ENTRY_ID: Final = "entry_id"
ATTR_TIME: Final = "time"
SERVICE_SCHEMA: Final = vol.Schema(
{
vol.Required(ATTR_ENTRY_ID): str,
vol.Optional(ATTR_DAY, default="today"): vol.In(["yesterday", "today", "tomorrow"]),
vol.Optional(ATTR_TIME): vol.Match(r"^(\d{2}:\d{2}(:\d{2})?)$"), # HH:mm or HH:mm:ss
}
)
def _merge_priceinfo_and_pricerating(price_info: list[dict], price_rating: list[dict]) -> list[dict]:
"""
Merge priceInfo and priceRating intervals by timestamp, prefixing rating fields.
Also rename startsAt to start_time. Preserves item order.
"""
rating_by_time = {(r.get("time") or r.get("startsAt")): r for r in price_rating or []}
merged = []
for interval in price_info or []:
ts = interval.get("startsAt")
merged_interval = {"start_time": ts} if ts is not None else {}
for k, v in interval.items():
if k == "startsAt":
continue
if k == "total":
merged_interval["price"] = v
merged_interval["price_ct"] = round(v * 100, 2)
elif k not in ("energy", "tax"):
merged_interval[k] = v
rating = rating_by_time.get(ts)
if rating:
for k, v in rating.items():
if k in ("time", "startsAt", "total", "tax", "energy"):
continue
if k == "difference":
merged_interval["rating_difference_%"] = v
else:
merged_interval[f"rating_{k}"] = v
merged.append(merged_interval)
return merged
def _find_previous_interval(
merged: list[dict],
all_ratings: list[dict],
coordinator: Any,
day: str,
) -> Any:
"""Find previous interval from previous day if needed."""
if merged and day == "today":
yday_info = coordinator.data["priceInfo"].get("yesterday") or []
if yday_info:
yday_ratings = [
r
for r in all_ratings
if r.get("time", r.get("startsAt", "")).startswith(_get_day_prefixes(yday_info)[0])
]
yday_merged = _merge_priceinfo_and_pricerating(yday_info, yday_ratings)
if yday_merged:
return yday_merged[-1]
return None
def _find_next_interval(
merged: list[dict],
all_ratings: list[dict],
coordinator: Any,
day: str,
) -> Any:
"""Find next interval from next day if needed."""
if merged and day == "today":
tmrw_info = coordinator.data["priceInfo"].get("tomorrow") or []
if tmrw_info:
tmrw_ratings = [
r
for r in all_ratings
if r.get("time", r.get("startsAt", "")).startswith(_get_day_prefixes(tmrw_info)[0])
]
tmrw_merged = _merge_priceinfo_and_pricerating(tmrw_info, tmrw_ratings)
if tmrw_merged:
return tmrw_merged[0]
return None
def _select_intervals(
merged: list[dict], all_ratings: list[dict], coordinator: Any, day: str, now: datetime, *, is_simulated: bool
) -> tuple[Any, Any, Any]:
"""
Select previous, current, and next intervals for the given day and time.
If is_simulated is True, always calculate previous/current/next for all days, but:
- For 'yesterday', never fetch previous from the day before yesterday.
- For 'tomorrow', never fetch next from the day after tomorrow.
If is_simulated is False, previous/current/next are None for 'yesterday' and 'tomorrow'.
"""
if not merged:
return None, None, None
if not is_simulated and day in ("yesterday", "tomorrow"):
return None, None, None
idx = None
for i, interval in enumerate(merged):
start_time = interval.get("start_time")
if not start_time:
continue
start_dt = dt_util.parse_datetime(start_time)
if start_dt is None:
try:
start_dt = datetime.fromisoformat(start_time)
except ValueError:
continue
if start_dt.tzinfo is None:
start_dt = dt_util.as_local(start_dt)
cmp_now = now
if cmp_now.tzinfo is None:
cmp_now = dt_util.as_local(cmp_now)
if start_dt <= cmp_now:
idx = i
if start_dt > cmp_now:
break
previous_interval = None
current_interval = None
next_interval = None
if idx is None:
next_interval = merged[0]
else:
current_interval = merged[idx]
previous_interval = merged[idx - 1] if idx > 0 else None
if idx + 1 < len(merged):
next_interval = merged[idx + 1]
# For today, allow previous/next from adjacent days
if day == "today":
if idx is not None and idx == 0:
previous_interval = _find_previous_interval(merged, all_ratings, coordinator, day)
if idx is not None and idx == len(merged) - 1:
next_interval = _find_next_interval(merged, all_ratings, coordinator, day)
return previous_interval, current_interval, next_interval
def get_adjacent_start_time(price_info_by_day: dict, day_key: str, *, first: bool) -> str | None:
"""Get the start_time from the first/last interval of an adjacent day."""
info = price_info_by_day.get(day_key) or []
if not info:
return None
idx = 0 if first else -1
return info[idx].get("startsAt")
def annotate_intervals_with_times(
merged: list[dict],
price_info_by_day: dict,
day: str,
) -> None:
"""Annotate merged intervals with end_time and previous_end_time."""
for idx, interval in enumerate(merged):
# Default: next interval's start_time
if idx + 1 < len(merged):
interval["end_time"] = merged[idx + 1].get("start_time")
# Last interval: look into tomorrow if today, or None otherwise
elif day == "today":
next_start = get_adjacent_start_time(price_info_by_day, "tomorrow", first=True)
interval["end_time"] = next_start
elif day == "yesterday":
next_start = get_adjacent_start_time(price_info_by_day, "today", first=True)
interval["end_time"] = next_start
elif day == "tomorrow":
interval["end_time"] = None
else:
interval["end_time"] = None
# First interval: look into yesterday if today, or None otherwise
if idx == 0:
if day == "today":
prev_end = get_adjacent_start_time(price_info_by_day, "yesterday", first=False)
interval["previous_end_time"] = prev_end
elif day == "tomorrow":
prev_end = get_adjacent_start_time(price_info_by_day, "today", first=False)
interval["previous_end_time"] = prev_end
elif day == "yesterday":
interval["previous_end_time"] = None
else:
interval["previous_end_time"] = None
def get_price_stat(merged: list[dict], stat: str) -> tuple[float, str | None, str | None]:
"""Return min or max price and its start and end time from merged intervals."""
if not merged:
return 0, None, None
values = [float(interval.get("price", 0)) for interval in merged if "price" in interval]
if not values:
return 0, None, None
val = min(values) if stat == "min" else max(values)
start_time = next((interval.get("start_time") for interval in merged if interval.get("price") == val), None)
end_time = next((interval.get("end_time") for interval in merged if interval.get("price") == val), None)
return val, start_time, end_time
async def _get_price(call: ServiceCall) -> dict[str, Any]:
"""
Return merged priceInfo and priceRating for the requested day and config entry.
If 'time' is provided, it must be in HH:mm or HH:mm:ss format and is combined with the selected 'day'.
This only affects 'previous', 'current', and 'next' fields, not the 'prices' list.
If 'time' is not provided, the current time is used for all days.
"""
hass = call.hass
day = call.data.get(ATTR_DAY, "today")
entry_id = call.data.get(ATTR_ENTRY_ID)
time_value = call.data.get(ATTR_TIME)
if not entry_id:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry = next((e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id), None)
if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id")
coordinator = entry.runtime_data.coordinator
data = coordinator.data or {}
price_info_data = data.get("priceInfo") or {}
price_rating_data = data.get("priceRating") or {}
hourly_ratings = price_rating_data.get("hourly") or []
rating_threshold_percentages = price_rating_data.get("thresholdPercentages")
currency = price_rating_data.get("currency")
# Fetch all relevant day data once
price_info_by_day = {d: price_info_data.get(d) or [] for d in ("yesterday", "today", "tomorrow")}
day_prefixes = {d: _get_day_prefixes(price_info_by_day[d]) for d in ("yesterday", "today", "tomorrow")}
ratings_by_day = {
d: [
r
for r in hourly_ratings
if day_prefixes[d] and r.get("time", r.get("startsAt", "")).startswith(day_prefixes[d][0])
]
if price_info_by_day[d] and day_prefixes[d]
else []
for d in ("yesterday", "today", "tomorrow")
}
price_info = price_info_by_day[day]
all_ratings = ratings_by_day[day]
merged = _merge_priceinfo_and_pricerating(price_info, all_ratings)
annotate_intervals_with_times(merged, price_info_by_day, day)
price_avg = (
round(sum(float(interval.get("price", 0)) for interval in merged if "price" in interval) / len(merged), 4)
if merged
else 0
)
price_min, price_min_start_time, price_min_end_time = get_price_stat(merged, "min")
price_max, price_max_start_time, price_max_end_time = get_price_stat(merged, "max")
if time_value:
if not price_info or not price_info[0].get("startsAt"):
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="no_data_for_day",
)
day_prefix = price_info[0]["startsAt"].split("T")[0]
dt_str = f"{day_prefix}T{time_value}"
try:
now = datetime.fromisoformat(dt_str)
except ValueError as exc:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_simulate_time",
translation_placeholders={"error": str(exc)},
) from exc
is_simulated = True
else:
if not price_info or not price_info[0].get("startsAt"):
now = dt_util.now().replace(second=0, microsecond=0)
else:
day_prefix = price_info[0]["startsAt"].split("T")[0]
current_time = dt_util.now().time().replace(second=0, microsecond=0)
dt_str = f"{day_prefix}T{current_time.isoformat()}"
try:
now = datetime.fromisoformat(dt_str)
except ValueError:
now = dt_util.now().replace(second=0, microsecond=0)
is_simulated = True
previous_interval, current_interval, next_interval = _select_intervals(
merged, ratings_by_day[day], coordinator, day, now, is_simulated=is_simulated
)
# Remove 'previous_end_time' from output intervals
for interval in merged:
if "previous_end_time" in interval:
del interval["previous_end_time"]
return {
"average": {
"start_time": merged[0].get("start_time") if merged else None,
"end_time": merged[0].get("end_time") if merged else None,
"price": price_avg,
"price_ct": round(price_avg * 100, 2),
},
"minimum": {
"start_time": price_min_start_time,
"end_time": price_min_end_time,
"price": price_min,
"price_ct": round(price_min * 100, 2),
},
"maximum": {
"start_time": price_max_start_time,
"end_time": price_max_end_time,
"price": price_max,
"price_ct": round(price_max * 100, 2),
},
"previous": previous_interval,
"current": current_interval,
"next": next_interval,
"currency": currency,
"rating_threshold_%": rating_threshold_percentages,
"prices": merged,
}
def _get_day_prefixes(price_info: list[dict]) -> list[str]:
"""Get ISO date prefixes for the requested day from price_info intervals."""
prefixes = set()
for interval in price_info:
ts = interval.get("startsAt")
if ts and "T" in ts:
prefixes.add(ts.split("T")[0])
return list(prefixes)
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Set up services for Tibber Prices integration."""
hass.services.async_register(
DOMAIN,
PRICE_SERVICE_NAME,
_get_price,
schema=SERVICE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)

View file

@ -0,0 +1,32 @@
get_price:
name: Get Tibber Price Info
description: >-
Returns merged priceInfo and priceRating for the requested day and config entry. Optionally, you can simulate the response as if it were a specific time-of-day (time, format HH:mm or HH:mm:ss), which only affects the previous/current/next fields, not the prices list. The simulated time is always combined with the selected day.
fields:
entry_id:
name: Entry ID
description: The config entry ID for the Tibber integration.
required: true
example: "1234567890abcdef"
selector:
config_entry:
integration: tibber_prices
day:
name: Day
description: Which day to fetch prices for (yesterday, today, or tomorrow).
required: false
default: today
example: today
selector:
select:
options:
- yesterday
- today
- tomorrow
time:
name: Time
description: >-
Time-of-day in HH:mm or HH:mm:ss format. If provided, simulates the response as if this were the current time for interval selection (previous/current/next) for the selected day. Does not filter the prices list. Example: "15:00" or "15:00:00".
required: false
selector:
time: