refactoring for QUARTER_HOURLY prices

This commit is contained in:
Julian Pawlowski 2025-11-02 20:22:29 +00:00
parent 8c61292acf
commit 4f6d429132
5 changed files with 39 additions and 200 deletions

View file

@ -14,7 +14,7 @@ from homeassistant.const import PERCENTAGE, EntityCategory
from homeassistant.util import dt as dt_util
from .entity import TibberPricesEntity
from .sensor import detect_interval_granularity, find_price_data_for_interval
from .sensor import find_price_data_for_interval
if TYPE_CHECKING:
from collections.abc import Callable
@ -32,9 +32,8 @@ from .const import (
DEFAULT_PEAK_PRICE_FLEX,
)
MIN_TOMORROW_INTERVALS_HOURLY = 24
MINUTES_PER_INTERVAL = 15
MIN_TOMORROW_INTERVALS_15MIN = 96
TOMORROW_INTERVAL_COUNTS = {MIN_TOMORROW_INTERVALS_HOURLY, MIN_TOMORROW_INTERVALS_15MIN}
ENTITY_DESCRIPTIONS = (
BinarySensorEntityDescription(
@ -167,7 +166,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
price_info = self.coordinator.data.get("priceInfo", {})
tomorrow_prices = price_info.get("tomorrow", [])
interval_count = len(tomorrow_prices)
if interval_count in TOMORROW_INTERVAL_COUNTS:
if interval_count == MIN_TOMORROW_INTERVALS_15MIN:
return True
if interval_count == 0:
return False
@ -182,7 +181,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
interval_count = len(tomorrow_prices)
if interval_count == 0:
status = "none"
elif interval_count in TOMORROW_INTERVAL_COUNTS:
elif interval_count == MIN_TOMORROW_INTERVALS_15MIN:
status = "full"
else:
status = "partial"
@ -217,11 +216,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
now = dt_util.now()
# Detect interval granularity
interval_length = detect_interval_granularity(today_prices)
# Find price data for current interval
current_interval_data = find_price_data_for_interval({"today": today_prices}, now, interval_length)
current_interval_data = find_price_data_for_interval({"today": today_prices}, now)
if not current_interval_data:
return None
@ -238,13 +233,11 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"""Annotate a single interval with all required attributes for Home Assistant UI and automations."""
interval_copy = interval.copy()
interval_remaining = annotation_ctx["interval_count"] - annotation_ctx["interval_idx"]
# Extract interval-related fields for attribute ordering and clarity
interval_start = interval_copy.pop("interval_start", None)
interval_end = interval_copy.pop("interval_end", None)
interval_hour = interval_copy.pop("interval_hour", None)
interval_minute = interval_copy.pop("interval_minute", None)
interval_time = interval_copy.pop("interval_time", None)
interval_length_minute = interval_copy.pop("interval_length_minute", annotation_ctx["interval_length"])
price = interval_copy.pop("price", None)
new_interval = {
"period_start": annotation_ctx["period_start"],
@ -253,7 +246,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"minute": annotation_ctx["period_start_minute"],
"time": annotation_ctx["period_start_time"],
"period_length_minute": annotation_ctx["period_length"],
"period_remaining_minute_after_interval": interval_remaining * annotation_ctx["interval_length"],
"period_remaining_minute_after_interval": interval_remaining * MINUTES_PER_INTERVAL,
"periods_total": annotation_ctx["period_count"],
"periods_remaining": annotation_ctx["periods_remaining"],
"period_position": annotation_ctx["period_idx"],
@ -265,10 +258,8 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"interval_hour": interval_hour,
"interval_minute": interval_minute,
"interval_time": interval_time,
"interval_length_minute": interval_length_minute,
"price": price,
}
# Merge any extra fields from the original interval (future-proofing)
new_interval.update(interval_copy)
new_interval["price_minor"] = round(new_interval["price"] * 100, 2)
price_diff = new_interval["price"] - annotation_ctx["ref_price"]
@ -298,7 +289,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
periods: list[list[dict]],
ref_prices: dict,
avg_price_by_day: dict,
interval_length: int,
) -> list[dict]:
"""
Return flattened and annotated intervals with period info and requested properties.
@ -333,7 +323,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
period_start_time = f"{period_start_hour:02d}:{period_start_minute:02d}" if period_start else None
period_end = period[-1]["interval_end"] if period else None
interval_count = len(period)
period_length = interval_count * interval_length
period_length = interval_count * MINUTES_PER_INTERVAL
periods_remaining = len(periods) - period_idx
for interval_idx, interval in enumerate(period, 1):
interval_start = interval.get("interval_start")
@ -349,7 +339,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"period_length": period_length,
"interval_count": interval_count,
"interval_idx": interval_idx,
"interval_length": interval_length,
"period_count": period_count,
"periods_remaining": periods_remaining,
"period_idx": period_idx,
@ -366,10 +355,9 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
result.append(new_interval)
return result
def _split_intervals_by_day(self, all_prices: list[dict]) -> tuple[dict, dict, dict]:
"""Split intervals by day, calculate interval minutes and average price per day."""
def _split_intervals_by_day(self, all_prices: list[dict]) -> tuple[dict, dict]:
"""Split intervals by day and calculate average price per day."""
intervals_by_day: dict = {}
interval_length_by_day: dict = {}
avg_price_by_day: dict = {}
for price_data in all_prices:
dt = dt_util.parse_datetime(price_data["startsAt"])
@ -378,9 +366,8 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
date = dt.date()
intervals_by_day.setdefault(date, []).append(price_data)
for date, intervals in intervals_by_day.items():
interval_length_by_day[date] = detect_interval_granularity(intervals)
avg_price_by_day[date] = sum(float(p["total"]) for p in intervals) / len(intervals)
return intervals_by_day, interval_length_by_day, avg_price_by_day
return intervals_by_day, avg_price_by_day
def _calculate_reference_prices(self, intervals_by_day: dict, *, reverse_sort: bool) -> dict:
"""Calculate reference prices for each day."""
@ -397,7 +384,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
self,
all_prices: list[dict],
ref_prices: dict,
interval_length_by_day: dict,
flex: float,
*,
reverse_sort: bool,
@ -417,19 +403,14 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
starts_at = dt_util.as_local(starts_at)
date = starts_at.date()
ref_price = ref_prices[date]
interval_length = interval_length_by_day[date]
price = float(price_data["total"])
percent_diff = ((price - ref_price) / ref_price) * 100 if ref_price != 0 else 0.0
percent_diff = round(percent_diff, 2)
# For best price (flex >= 0): percent_diff <= flex*100 (prices up to flex% above reference)
# For peak price (flex <= 0): percent_diff >= -flex*100 (prices up to |flex|% above reference)
in_flex = percent_diff <= flex * 100 if not reverse_sort else percent_diff >= -flex * 100
# Split period if day or interval length changes
if (
last_ref_date is not None
and (date != last_ref_date or interval_length != interval_length_by_day[last_ref_date])
and current_period
):
# For peak price (flex <= 0): percent_diff >= flex*100 (prices down to |flex|% below reference)
in_flex = percent_diff <= flex * 100 if not reverse_sort else percent_diff >= flex * 100
# Split period if day changes
if last_ref_date is not None and date != last_ref_date and current_period:
periods.append(current_period)
current_period = []
last_ref_date = date
@ -439,7 +420,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"interval_hour": starts_at.hour,
"interval_minute": starts_at.minute,
"interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}",
"interval_length_minute": interval_length,
"price": price,
"interval_start": starts_at,
}
@ -458,9 +438,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if idx + 1 < len(period):
interval["interval_end"] = period[idx + 1]["interval_start"]
else:
interval["interval_end"] = interval["interval_start"] + timedelta(
minutes=interval["interval_length_minute"]
)
interval["interval_end"] = interval["interval_start"] + timedelta(minutes=MINUTES_PER_INTERVAL)
def _filter_intervals_today_tomorrow(self, result: list[dict]) -> list[dict]:
"""Filter intervals to only include those from today and tomorrow."""
@ -511,7 +489,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if not all_prices:
return None
all_prices.sort(key=lambda p: p["startsAt"])
intervals_by_day, interval_length_by_day, avg_price_by_day = self._split_intervals_by_day(all_prices)
intervals_by_day, avg_price_by_day = self._split_intervals_by_day(all_prices)
ref_prices = self._calculate_reference_prices(intervals_by_day, reverse_sort=reverse_sort)
flex = self._get_flex_option(
CONF_BEST_PRICE_FLEX if not reverse_sort else CONF_PEAK_PRICE_FLEX,
@ -520,22 +498,21 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
periods = self._build_periods(
all_prices,
ref_prices,
interval_length_by_day,
flex,
reverse_sort=reverse_sort,
)
self._add_interval_ends(periods)
# Only use periods relevant for today/tomorrow for annotation and attribute calculation
filtered_periods = self._filter_periods_today_tomorrow(periods)
# Use the last interval's interval_length for period annotation (approximate)
result = self._annotate_period_intervals(
filtered_periods,
ref_prices,
avg_price_by_day,
filtered_periods[-1][-1]["interval_length_minute"] if filtered_periods and filtered_periods[-1] else 60,
)
filtered_result = self._filter_intervals_today_tomorrow(result)
current_interval = self._find_current_or_next_interval(filtered_result)
if not current_interval and filtered_result:
current_interval = filtered_result[0]
attributes = {**current_interval} if current_interval else {}
attributes["intervals"] = filtered_result
return attributes

View file

@ -380,16 +380,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
tomorrow_prices = price_info.get("tomorrow", [])
return today_prices + tomorrow_prices
def get_interval_granularity(self) -> int | None:
"""Get the granularity of price intervals in minutes."""
all_intervals = self.get_all_intervals()
if not all_intervals:
return None
from .sensor import detect_interval_granularity as detect_granularity
return detect_granularity(all_intervals)
async def refresh_user_data(self) -> bool:
"""Force refresh of user data and return True if data was updated."""
try:

View file

@ -344,13 +344,12 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
return None
def _get_interval_price_value(self, *, interval_offset: int, in_euro: bool) -> float | None:
"""Get price for the current interval or with offset, handling different interval granularities."""
"""Get price for the current interval or with offset, handling 15-minute intervals."""
if not self.coordinator.data:
return None
all_intervals = self.coordinator.get_all_intervals()
granularity = self.coordinator.get_interval_granularity()
if not all_intervals or granularity is None:
if not all_intervals:
return None
now = dt_util.now()
@ -360,7 +359,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
starts_at = interval.get("startsAt")
if starts_at:
ts = dt_util.parse_datetime(starts_at)
if ts and ts <= now < ts + timedelta(minutes=granularity):
if ts and ts <= now < ts + timedelta(minutes=MINUTES_PER_INTERVAL):
current_idx = idx
break
@ -438,12 +437,9 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
return None
predicate = None
if rating_type == "hourly":
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL
def interval_predicate(entry_time: datetime) -> bool:
interval_end = entry_time + timedelta(minutes=data_granularity)
interval_end = entry_time + timedelta(minutes=MINUTES_PER_INTERVAL)
return entry_time <= now < interval_end and entry_time.date() == now.date()
predicate = interval_predicate
@ -548,7 +544,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
price_info = self.coordinator.data.get("priceInfo", {})
price_rating = self.coordinator.data.get("priceRating", {})
# Determine data granularity from the current price data
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = today_prices + tomorrow_prices
@ -556,7 +551,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if not all_prices:
return None
data_granularity = detect_interval_granularity(all_prices)
now = dt_util.now()
# Initialize the result list
@ -581,7 +575,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
"rating_level": entry.get("level"),
}
# Create a list of all future price data points
for day_key in ["today", "tomorrow"]:
for price_data in price_info.get(day_key, []):
starts_at = dt_util.parse_datetime(price_data["startsAt"])
@ -589,19 +582,16 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
continue
starts_at = dt_util.as_local(starts_at)
interval_end = starts_at + timedelta(minutes=data_granularity)
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
# Only include future intervals
if starts_at > now:
# Format timestamp for rating lookup
starts_at_key = starts_at.replace(second=0, microsecond=0).isoformat()
# Try to find rating data for this interval
interval_rating = rating_data.get(starts_at_key) or {}
future_prices.append(
{
"interval_start": starts_at.isoformat(), # Renamed from starts_at to interval_start
"interval_start": starts_at.isoformat(),
"interval_end": interval_end.isoformat(),
"price": float(price_data["total"]),
"price_cents": round(float(price_data["total"]) * 100, 2),
@ -630,16 +620,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
attributes["intervals"] = future_prices
attributes["data_available"] = True
# Determine interval granularity for display purposes
min_intervals_for_granularity_detection = 2
if len(future_prices) >= min_intervals_for_granularity_detection:
start1 = datetime.fromisoformat(future_prices[0]["interval_start"])
start2 = datetime.fromisoformat(future_prices[1]["interval_start"])
minutes_diff = int((start2 - start1).total_seconds() / 60)
attributes["interval_minutes"] = minutes_diff
else:
attributes["interval_minutes"] = MINUTES_PER_INTERVAL
# Group by hour for easier consumption in dashboards
hours = {}
for interval in future_prices:
@ -853,15 +833,11 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if self.entity_description.key == "price_level" and current_interval_data and "level" in current_interval_data:
self._add_price_level_attributes(attributes, current_interval_data["level"])
# Add timestamp for next interval price sensors
if self.entity_description.key in ["next_interval_price", "next_interval_price_eur"]:
# Get the next interval's data
price_info = self.coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL
now = dt_util.now()
next_interval_time = now + timedelta(minutes=data_granularity)
next_interval_data = find_price_data_for_interval(price_info, next_interval_time, data_granularity)
next_interval_time = now + timedelta(minutes=MINUTES_PER_INTERVAL)
next_interval_data = find_price_data_for_interval(price_info, next_interval_time)
attributes["timestamp"] = next_interval_data["startsAt"] if next_interval_data else None
def _add_price_level_attributes(self, attributes: dict, level: str) -> None:
@ -897,9 +873,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
price_info = self.coordinator.data.get("priceInfo", {})
now = dt_util.now()
if key == "price_rating":
today_prices = price_info.get("today", [])
data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL
interval_data = find_price_data_for_interval(price_info, now, data_granularity)
interval_data = find_price_data_for_interval(price_info, now)
attributes["timestamp"] = interval_data["startsAt"] if interval_data else None
if hasattr(self, "_last_rating_difference") and self._last_rating_difference is not None:
attributes["difference_" + PERCENTAGE] = self._last_rating_difference
@ -931,133 +905,34 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
await self.coordinator.async_request_refresh()
def detect_interval_granularity(price_data: list[dict]) -> int:
def find_price_data_for_interval(price_info: Any, target_time: datetime) -> dict | None:
"""
Detect the granularity of price intervals in minutes.
Args:
price_data: List of price data points with startsAt timestamps
Returns:
Minutes per interval (e.g., 60 for hourly, 15 for 15-minute intervals)
"""
min_datapoints_for_granularity = 2
if not price_data or len(price_data) < min_datapoints_for_granularity:
return MINUTES_PER_INTERVAL # Default to target value
# Sort data points by timestamp
sorted_data = sorted(price_data, key=lambda x: x["startsAt"])
# Calculate the time differences between consecutive timestamps
intervals = []
for i in range(1, min(10, len(sorted_data))): # Sample up to 10 intervals
start_time_1 = dt_util.parse_datetime(sorted_data[i - 1]["startsAt"])
start_time_2 = dt_util.parse_datetime(sorted_data[i]["startsAt"])
if start_time_1 and start_time_2:
diff_minutes = (start_time_2 - start_time_1).total_seconds() / 60
intervals.append(round(diff_minutes))
# If no valid intervals found, return default
if not intervals:
return MINUTES_PER_INTERVAL
# Return the most common interval (mode)
return max(set(intervals), key=intervals.count)
def get_interval_for_timestamp(timestamp: datetime, granularity: int) -> int:
"""
Calculate the interval index within an hour for a given timestamp.
Args:
timestamp: The timestamp to calculate interval for
granularity: Minutes per interval
Returns:
Interval index (0-based) within the hour
"""
# Calculate which interval this timestamp falls into
intervals_per_hour = 60 // granularity
return (timestamp.minute // granularity) % intervals_per_hour
def _match_hourly_price_data(day_prices: list, target_time: datetime) -> dict | None:
"""Match price data for hourly granularity."""
for price_data in day_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
if starts_at.hour == target_time.hour and starts_at.date() == target_time.date():
return price_data
return None
def _match_granular_price_data(day_prices: list, target_time: datetime, data_granularity: int) -> dict | None:
"""Match price data for sub-hourly granularity."""
for price_data in day_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
interval_end = starts_at + timedelta(minutes=data_granularity)
# Check if target time falls within this interval
if starts_at <= target_time < interval_end and starts_at.date() == target_time.date():
return price_data
return None
def find_price_data_for_interval(
price_info: Any, target_time: datetime, data_granularity: int | None = None
) -> dict | None:
"""
Find the price data for a specific timestamp, handling different interval granularities.
Find the price data for a specific 15-minute interval timestamp.
Args:
price_info: The price info dictionary from Tibber API
target_time: The target timestamp to find price data for
data_granularity: Override detected granularity with this value (minutes)
Returns:
Price data dict if found, None otherwise
"""
# Determine which day's data to search
day_key = "tomorrow" if target_time.date() > dt_util.now().date() else "today"
search_days = [day_key, "tomorrow" if day_key == "today" else "today"]
# Try to find price data in today or tomorrow
for search_day in search_days:
day_prices = price_info.get(search_day, [])
if not day_prices:
continue
# Detect the granularity if not provided
if data_granularity is None:
data_granularity = detect_interval_granularity(day_prices)
for price_data in day_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
# Check for a match with appropriate granularity
if data_granularity >= MINUTES_PER_INTERVAL * 4: # 60 minutes = hourly
result = _match_hourly_price_data(day_prices, target_time)
else:
result = _match_granular_price_data(day_prices, target_time, data_granularity)
if result:
return result
# If not found and we have sub-hourly granularity, try to fall back to hourly data
if data_granularity is not None and data_granularity < MINUTES_PER_INTERVAL * 4:
hour_start = target_time.replace(minute=0, second=0, microsecond=0)
for search_day in search_days:
day_prices = price_info.get(search_day, [])
result = _match_hourly_price_data(day_prices, hour_start)
if result:
return result
starts_at = dt_util.as_local(starts_at)
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
if starts_at <= target_time < interval_end and starts_at.date() == target_time.date():
return price_data
return None

View file

@ -147,6 +147,8 @@ async def _get_price(call: ServiceCall) -> dict[str, Any]:
for interval in merged:
if "previous_end_time" in interval:
del interval["previous_end_time"]
if "start_dt" in interval:
del interval["start_dt"]
response_ctx = PriceResponseContext(
price_stats=price_stats,

View file

@ -79,8 +79,3 @@ class TestBasicCoordinator:
"""Test getting all intervals when no data is available."""
intervals = coordinator.get_all_intervals()
assert intervals == []
def test_get_interval_granularity(self, coordinator):
"""Test getting interval granularity."""
granularity = coordinator.get_interval_granularity()
assert granularity is None