refactoring

This commit is contained in:
Julian Pawlowski 2025-05-17 20:01:39 +00:00
parent 52cfc4a87f
commit 7c4ae98417

View file

@ -170,10 +170,10 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
now = dt_util.now()
# Detect interval granularity
interval_minutes = detect_interval_granularity(today_prices)
interval_length = detect_interval_granularity(today_prices)
# Find price data for current interval
current_interval_data = find_price_data_for_interval({"today": today_prices}, now, interval_minutes)
current_interval_data = find_price_data_for_interval({"today": today_prices}, now, interval_length)
if not current_interval_data:
return None
@ -182,15 +182,80 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
prices.sort()
return prices, float(current_interval_data["total"])
def _annotate_single_interval(
self,
interval: dict,
annotation_ctx: dict,
) -> dict:
"""Annotate a single interval with all required attributes."""
interval_copy = interval.copy()
interval_remaining = annotation_ctx["interval_count"] - annotation_ctx["interval_idx"]
# Extract all interval-related fields first
interval_start = interval_copy.pop("interval_start", None)
interval_end = interval_copy.pop("interval_end", None)
interval_hour = interval_copy.pop("interval_hour", None)
interval_minute = interval_copy.pop("interval_minute", None)
interval_time = interval_copy.pop("interval_time", None)
interval_length_minute = interval_copy.pop("interval_length_minute", annotation_ctx["interval_length"])
# Extract price
price = interval_copy.pop("price", None)
new_interval = {
"period_start": annotation_ctx["period_start"],
"period_end": annotation_ctx["period_end"],
"hour": annotation_ctx["period_start_hour"],
"minute": annotation_ctx["period_start_minute"],
"time": annotation_ctx["period_start_time"],
"period_length_minute": annotation_ctx["period_length"],
"period_remaining_minute_after_interval": interval_remaining * annotation_ctx["interval_length"],
"periods_total": annotation_ctx["period_count"],
"periods_remaining": annotation_ctx["periods_remaining"],
"period_position": annotation_ctx["period_idx"],
"interval_total": annotation_ctx["interval_count"],
"interval_remaining": interval_remaining,
"interval_position": annotation_ctx["interval_idx"],
"interval_start": interval_start,
"interval_end": interval_end,
"interval_hour": interval_hour,
"interval_minute": interval_minute,
"interval_time": interval_time,
"interval_length_minute": interval_length_minute,
"price": price,
}
# Add any remaining fields (should be only extra/unknowns)
new_interval.update(interval_copy)
new_interval["price_ct"] = round(new_interval["price"] * 100, 2)
price_diff = new_interval["price"] - annotation_ctx["ref_price"]
new_interval[annotation_ctx["diff_key"]] = round(price_diff, 4)
new_interval[annotation_ctx["diff_ct_key"]] = round(price_diff * 100, 2)
price_diff_percent = (
((new_interval["price"] - annotation_ctx["ref_price"]) / annotation_ctx["ref_price"]) * 100
if annotation_ctx["ref_price"] != 0
else 0.0
)
new_interval[annotation_ctx["diff_pct_key"]] = round(price_diff_percent, 2)
avg_diff = new_interval["price"] - annotation_ctx["avg_price"]
new_interval["price_diff_from_avg"] = round(avg_diff, 4)
new_interval["price_diff_from_avg_ct"] = round(avg_diff * 100, 2)
avg_diff_percent = (
((new_interval["price"] - annotation_ctx["avg_price"]) / annotation_ctx["avg_price"]) * 100
if annotation_ctx["avg_price"] != 0
else 0.0
)
new_interval["price_diff_from_avg_" + PERCENTAGE] = round(avg_diff_percent, 2)
return new_interval
def _annotate_period_intervals(
self,
periods: list[list[dict]],
ref_price: float,
avg_price: float,
interval_minutes: int,
ref_prices: dict,
avg_price_by_day: dict,
interval_length: int,
) -> list[dict]:
"""Return flattened and annotated intervals with period info and requested properties."""
# Determine reference type for naming
"""
Return flattened and annotated intervals with period info and requested properties.
Uses the correct reference price for each interval's date.
"""
reference_type = None
if self.entity_description.key == "best_price_period":
reference_type = "min"
@ -198,7 +263,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
reference_type = "max"
else:
reference_type = "ref"
# Set attribute name suffixes
if reference_type == "min":
diff_key = "price_diff_from_min"
diff_ct_key = "price_diff_from_min_ct"
@ -213,123 +277,121 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
diff_pct_key = "price_diff_" + PERCENTAGE
result = []
period_count = len(periods)
for idx, period in enumerate(periods, 1):
for period_idx, period in enumerate(periods, 1):
period_start = period[0]["interval_start"] if period else None
period_start_hour = period_start.hour if period_start else None
period_start_minute = period_start.minute if period_start else None
period_start_time = f"{period_start_hour:02d}:{period_start_minute:02d}" if period_start else None
period_end = period[-1]["interval_end"] if period else None
interval_count = len(period)
period_length = interval_count * interval_minutes
periods_remaining = len(periods) - idx
period_length = interval_count * interval_length
periods_remaining = len(periods) - period_idx
for interval_idx, interval in enumerate(period, 1):
interval_copy = interval.copy()
interval_remaining = interval_count - interval_idx
# Compose new dict with period-related keys first, then interval timing, then price info
new_interval = {
interval_start = interval.get("interval_start")
interval_date = interval_start.date() if interval_start else None
avg_price = avg_price_by_day.get(interval_date, 0)
ref_price = ref_prices.get(interval_date, 0)
annotation_ctx = {
"period_start": period_start,
"period_end": period_end,
"hour": period_start_hour,
"minute": period_start_minute,
"time": period_start_time,
"period_length_minute": period_length,
"period_remaining_minute_after_interval": interval_remaining * interval_minutes,
"periods_total": period_count,
"period_start_hour": period_start_hour,
"period_start_minute": period_start_minute,
"period_start_time": period_start_time,
"period_length": period_length,
"interval_count": interval_count,
"interval_idx": interval_idx,
"interval_length": interval_length,
"period_count": period_count,
"periods_remaining": periods_remaining,
"interval_total": interval_count,
"interval_remaining": interval_remaining,
"interval_position": interval_idx,
"period_idx": period_idx,
"ref_price": ref_price,
"avg_price": avg_price,
"diff_key": diff_key,
"diff_ct_key": diff_ct_key,
"diff_pct_key": diff_pct_key,
}
# Add interval timing
new_interval["interval_start"] = interval_copy.pop("interval_start", None)
new_interval["interval_end"] = interval_copy.pop("interval_end", None)
# Add hour, minute, time, price if present in interval_copy
for k in ("interval_hour", "interval_minute", "interval_time", "price"):
if k in interval_copy:
new_interval[k] = interval_copy.pop(k)
# Add the rest of the interval info (e.g. price_ct, price_difference_*, etc.)
new_interval.update(interval_copy)
new_interval["price_ct"] = round(new_interval["price"] * 100, 2)
price_diff = new_interval["price"] - ref_price
new_interval[diff_key] = round(price_diff, 4)
new_interval[diff_ct_key] = round(price_diff * 100, 2)
price_diff_percent = ((new_interval["price"] - ref_price) / ref_price) * 100 if ref_price != 0 else 0.0
new_interval[diff_pct_key] = round(price_diff_percent, 2)
# Add difference to average price of the day (avg_price is now passed in)
avg_diff = new_interval["price"] - avg_price
new_interval["price_diff_from_avg"] = round(avg_diff, 4)
new_interval["price_diff_from_avg_ct"] = round(avg_diff * 100, 2)
avg_diff_percent = ((new_interval["price"] - avg_price) / avg_price) * 100 if avg_price != 0 else 0.0
new_interval["price_diff_from_avg_" + PERCENTAGE] = round(avg_diff_percent, 2)
new_interval = self._annotate_single_interval(
interval,
annotation_ctx,
)
result.append(new_interval)
return result
def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None:
def _split_intervals_by_day(self, all_prices: list[dict]) -> tuple[dict, dict, dict]:
"""Split intervals by day, calculate interval minutes and average price per day."""
intervals_by_day: dict = {}
interval_length_by_day: dict = {}
avg_price_by_day: dict = {}
for price_data in all_prices:
dt = dt_util.parse_datetime(price_data["startsAt"])
if dt is None:
continue
date = dt.date()
intervals_by_day.setdefault(date, []).append(price_data)
for date, intervals in intervals_by_day.items():
interval_length_by_day[date] = detect_interval_granularity(intervals)
avg_price_by_day[date] = sum(float(p["total"]) for p in intervals) / len(intervals)
return intervals_by_day, interval_length_by_day, avg_price_by_day
def _calculate_reference_prices(self, intervals_by_day: dict, *, reverse_sort: bool) -> dict:
"""Calculate reference prices for each day."""
ref_prices: dict = {}
for date, intervals in intervals_by_day.items():
prices = [float(p["total"]) for p in intervals]
if reverse_sort is False:
ref_prices[date] = min(prices)
else:
ref_prices[date] = max(prices)
return ref_prices
def _build_periods(
self,
all_prices: list[dict],
ref_prices: dict,
interval_length_by_day: dict,
flex: float,
*,
reverse_sort: bool,
) -> list[list[dict]]:
"""
Get price interval attributes with support for 15-minute intervals and period grouping.
Args:
reverse_sort: Whether to sort prices in reverse (high to low)
Returns:
Dictionary with interval data or None if not available
Build periods, allowing periods to cross midnight (day boundary).
Strictly enforce flex threshold by percent diff, matching attribute calculation.
"""
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
today_prices = price_info.get("today", [])
if not today_prices:
return None
interval_minutes = detect_interval_granularity(today_prices)
# Use entity type to determine flex and logic, but always use 'price_intervals' as attribute name
if reverse_sort is False: # best_price_period entity
flex = self._get_flex_option(CONF_BEST_PRICE_FLEX, DEFAULT_BEST_PRICE_FLEX)
prices = [float(p["total"]) for p in today_prices]
min_price = min(prices)
def in_range(price: float) -> bool:
return price <= min_price * (1 + flex)
ref_price = min_price
elif reverse_sort is True: # peak_price_period entity
flex = self._get_flex_option(CONF_PEAK_PRICE_FLEX, DEFAULT_PEAK_PRICE_FLEX)
prices = [float(p["total"]) for p in today_prices]
max_price = max(prices)
def in_range(price: float) -> bool:
return price >= max_price * (1 - flex)
ref_price = max_price
else:
return None
# Calculate average price for the day (all intervals, not just periods)
all_prices = [float(p["total"]) for p in today_prices]
avg_price = sum(all_prices) / len(all_prices) if all_prices else 0.0
# Build intervals with period grouping
periods = []
current_period = []
for price_data in today_prices:
periods: list[list[dict]] = []
current_period: list[dict] = []
last_ref_date = None
for price_data in all_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
date = starts_at.date()
ref_price = ref_prices[date]
interval_length = interval_length_by_day[date]
price = float(price_data["total"])
if in_range(price):
percent_diff = ((price - ref_price) / ref_price) * 100 if ref_price != 0 else 0.0
percent_diff = round(percent_diff, 2)
# For best price: percent_diff <= flex*100; for peak: percent_diff >= -flex*100
in_flex = percent_diff <= flex * 100 if not reverse_sort else percent_diff >= -flex * 100
# Split period if day or interval length changes
if (
last_ref_date is not None
and (date != last_ref_date or interval_length != interval_length_by_day[last_ref_date])
and current_period
):
periods.append(current_period)
current_period = []
last_ref_date = date
if in_flex:
current_period.append(
{
"interval_hour": starts_at.hour,
"interval_minute": starts_at.minute,
"interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}",
"interval_length_minute": interval_length,
"price": price,
"interval_start": starts_at,
# interval_end will be filled later
}
)
elif current_period:
@ -337,37 +399,95 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
current_period = []
if current_period:
periods.append(current_period)
return periods
# Add interval_end to each interval (next interval's start or None)
def _add_interval_ends(self, periods: list[list[dict]]) -> None:
"""Add interval_end to each interval using per-interval interval_length."""
for period in periods:
for idx, interval in enumerate(period):
if idx + 1 < len(period):
interval["interval_end"] = period[idx + 1]["interval_start"]
else:
# Try to estimate end as start + interval_minutes
interval["interval_end"] = interval["interval_start"] + timedelta(minutes=interval_minutes)
interval["interval_end"] = interval["interval_start"] + timedelta(
minutes=interval["interval_length_minute"]
)
result = self._annotate_period_intervals(periods, ref_price, avg_price, interval_minutes)
def _filter_intervals_today_tomorrow(self, result: list[dict]) -> list[dict]:
"""Filter intervals to only include those from today and tomorrow."""
today = dt_util.now().date()
tomorrow = today + timedelta(days=1)
return [
interval
for interval in result
if interval.get("interval_start") and today <= interval["interval_start"].date() <= tomorrow
]
# Find the current or next interval (by time) from the annotated result
def _find_current_or_next_interval(self, filtered_result: list[dict]) -> dict | None:
"""Find the current or next interval from the filtered list."""
now = dt_util.now()
current_interval = None
for interval in result:
for interval in filtered_result:
start = interval.get("interval_start")
end = interval.get("interval_end")
if start and end and start <= now < end:
current_interval = interval.copy()
break
else:
# If no current interval, show the next period's first interval (if available)
for interval in result:
start = interval.get("interval_start")
if start and start > now:
current_interval = interval.copy()
break
return interval.copy()
for interval in filtered_result:
start = interval.get("interval_start")
if start and start > now:
return interval.copy()
return None
def _filter_periods_today_tomorrow(self, periods: list[list[dict]]) -> list[list[dict]]:
"""Filter periods to only those with at least one interval in today or tomorrow."""
today = dt_util.now().date()
tomorrow = today + timedelta(days=1)
return [
period
for period in periods
if any(
interval.get("interval_start") and today <= interval["interval_start"].date() <= tomorrow
for interval in period
)
]
def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None:
"""Get price interval attributes with support for 15-minute intervals and period grouping."""
if not self.coordinator.data:
return None
price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"]
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return None
all_prices.sort(key=lambda p: p["startsAt"])
intervals_by_day, interval_length_by_day, avg_price_by_day = self._split_intervals_by_day(all_prices)
ref_prices = self._calculate_reference_prices(intervals_by_day, reverse_sort=reverse_sort)
flex = self._get_flex_option(
CONF_BEST_PRICE_FLEX if not reverse_sort else CONF_PEAK_PRICE_FLEX,
DEFAULT_BEST_PRICE_FLEX if not reverse_sort else DEFAULT_PEAK_PRICE_FLEX,
)
periods = self._build_periods(
all_prices,
ref_prices,
interval_length_by_day,
flex,
reverse_sort=reverse_sort,
)
self._add_interval_ends(periods)
# Only use periods relevant for today/tomorrow for annotation and attribute calculation
filtered_periods = self._filter_periods_today_tomorrow(periods)
# Use the last interval's interval_length for period annotation (approximate)
result = self._annotate_period_intervals(
filtered_periods,
ref_prices,
avg_price_by_day,
filtered_periods[-1][-1]["interval_length_minute"] if filtered_periods and filtered_periods[-1] else 60,
)
filtered_result = self._filter_intervals_today_tomorrow(result)
current_interval = self._find_current_or_next_interval(filtered_result)
attributes = {**current_interval} if current_interval else {}
attributes["intervals"] = result
attributes["intervals"] = filtered_result
return attributes
def _get_price_hours_attributes(self, *, attribute_name: str, reverse_sort: bool) -> dict | None: