refactor: Update interval attribute keys and improve period merging logic in TibberPricesBinarySensor

This commit is contained in:
Julian Pawlowski 2025-11-07 23:31:29 +00:00
parent ca88f136c3
commit 3df68db20b

View file

@ -14,7 +14,6 @@ from homeassistant.const import PERCENTAGE, EntityCategory
from homeassistant.core import callback
from homeassistant.util import dt as dt_util
from .average_utils import calculate_leading_24h_avg, calculate_trailing_24h_avg
from .coordinator import TIME_SENSITIVE_ENTITY_KEYS
from .entity import TibberPricesEntity
from .sensor import find_price_data_for_interval
@ -228,11 +227,11 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if not self.coordinator.data:
return None
attrs = self._get_price_intervals_attributes(reverse_sort=False)
if not attrs or "interval_start" not in attrs or "interval_end" not in attrs:
if not attrs or "start" not in attrs or "end" not in attrs:
return None
now = dt_util.now()
start = attrs.get("interval_start")
end = attrs.get("interval_end")
start = attrs.get("start")
end = attrs.get("end")
return start <= now < end if start and end else None
def _peak_price_state(self) -> bool | None:
@ -240,11 +239,11 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if not self.coordinator.data:
return None
attrs = self._get_price_intervals_attributes(reverse_sort=True)
if not attrs or "interval_start" not in attrs or "interval_end" not in attrs:
if not attrs or "start" not in attrs or "end" not in attrs:
return None
now = dt_util.now()
start = attrs.get("interval_start")
end = attrs.get("interval_end")
start = attrs.get("start")
end = attrs.get("end")
return start <= now < end if start and end else None
def _tomorrow_data_available_state(self) -> bool | None:
@ -321,77 +320,58 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"""Annotate a single interval with all required attributes for Home Assistant UI and automations."""
interval_copy = interval.copy()
interval_remaining = annotation_ctx["interval_count"] - annotation_ctx["interval_idx"]
# Extract and keep internal interval fields for logic (with _ prefix)
interval_start = interval_copy.pop("interval_start", None)
interval_end = interval_copy.pop("interval_end", None)
interval_hour = interval_copy.pop("interval_hour", None)
interval_minute = interval_copy.pop("interval_minute", None)
interval_time = interval_copy.pop("interval_time", None)
price = interval_copy.pop("price", None)
# Remove other interval_* fields that are no longer needed
interval_copy.pop("interval_hour", None)
interval_copy.pop("interval_minute", None)
interval_copy.pop("interval_time", None)
# Remove startsAt - not needed anymore
interval_copy.pop("startsAt", None)
price_raw = interval_copy.pop("price", None)
new_interval = {
"period_start": annotation_ctx["period_start"],
"period_end": annotation_ctx["period_end"],
"hour": annotation_ctx["period_start_hour"],
"minute": annotation_ctx["period_start_minute"],
"time": annotation_ctx["period_start_time"],
"period_length_minute": annotation_ctx["period_length"],
"period_remaining_minute_after_interval": interval_remaining * MINUTES_PER_INTERVAL,
"duration_minutes": annotation_ctx["period_length"],
"remaining_minutes_after_interval": interval_remaining * MINUTES_PER_INTERVAL,
"periods_total": annotation_ctx["period_count"],
"periods_remaining": annotation_ctx["periods_remaining"],
"period_position": annotation_ctx["period_idx"],
"interval_total": annotation_ctx["interval_count"],
"interval_remaining": interval_remaining,
"interval_position": annotation_ctx["interval_idx"],
"interval_start": interval_start,
"interval_end": interval_end,
"interval_hour": interval_hour,
"interval_minute": interval_minute,
"interval_time": interval_time,
"price": price,
"price": round(price_raw * 100, 2) if price_raw is not None else None,
# Keep internal fields for logic (state checks, filtering)
"_interval_start": interval_start,
"_interval_end": interval_end,
}
new_interval.update(interval_copy)
new_interval["price_minor"] = round(new_interval["price"] * 100, 2)
price_diff = new_interval["price"] - annotation_ctx["ref_price"]
new_interval[annotation_ctx["diff_key"]] = round(price_diff, 4)
new_interval[annotation_ctx["diff_ct_key"]] = round(price_diff * 100, 2)
# Calculate percent difference from reference price (min or max)
price_diff_percent = (
((new_interval["price"] - annotation_ctx["ref_price"]) / annotation_ctx["ref_price"]) * 100
if annotation_ctx["ref_price"] != 0
else 0.0
)
new_interval[annotation_ctx["diff_pct_key"]] = round(price_diff_percent, 2)
# Calculate difference from average price for the day
avg_diff = new_interval["price"] - annotation_ctx["avg_price"]
new_interval["price_diff_from_avg"] = round(avg_diff, 4)
new_interval["price_diff_from_avg_minor"] = round(avg_diff * 100, 2)
avg_diff_percent = (
((new_interval["price"] - annotation_ctx["avg_price"]) / annotation_ctx["avg_price"]) * 100
if annotation_ctx["avg_price"] != 0
else 0.0
)
new_interval["price_diff_from_avg_" + PERCENTAGE] = round(avg_diff_percent, 2)
# Calculate difference from trailing 24-hour average
trailing_avg = annotation_ctx.get("trailing_24h_avg", 0.0)
trailing_avg_diff = new_interval["price"] - trailing_avg
new_interval["price_diff_from_trailing_24h_avg"] = round(trailing_avg_diff, 4)
new_interval["price_diff_from_trailing_24h_avg_minor"] = round(trailing_avg_diff * 100, 2)
trailing_avg_diff_percent = (
((new_interval["price"] - trailing_avg) / trailing_avg) * 100 if trailing_avg != 0 else 0.0
)
new_interval["price_diff_from_trailing_24h_avg_" + PERCENTAGE] = round(trailing_avg_diff_percent, 2)
new_interval["trailing_24h_avg_price"] = round(trailing_avg, 4)
new_interval["trailing_24h_avg_price_minor"] = round(trailing_avg * 100, 2)
# Calculate difference from leading 24-hour average
leading_avg = annotation_ctx.get("leading_24h_avg", 0.0)
leading_avg_diff = new_interval["price"] - leading_avg
new_interval["price_diff_from_leading_24h_avg"] = round(leading_avg_diff, 4)
new_interval["price_diff_from_leading_24h_avg_minor"] = round(leading_avg_diff * 100, 2)
leading_avg_diff_percent = (
((new_interval["price"] - leading_avg) / leading_avg) * 100 if leading_avg != 0 else 0.0
)
new_interval["price_diff_from_leading_24h_avg_" + PERCENTAGE] = round(leading_avg_diff_percent, 2)
new_interval["leading_24h_avg_price"] = round(leading_avg, 4)
new_interval["leading_24h_avg_price_minor"] = round(leading_avg * 100, 2)
# Add price_diff_from_min for best_price_period
if annotation_ctx.get("diff_key") == "price_diff_from_min":
price_diff = price_raw - annotation_ctx["ref_price"]
new_interval["price_diff_from_min"] = round(price_diff * 100, 2)
# Calculate percent difference from min price
price_diff_percent = (
((price_raw - annotation_ctx["ref_price"]) / annotation_ctx["ref_price"]) * 100
if annotation_ctx["ref_price"] != 0
else 0.0
)
new_interval["price_diff_from_min_" + PERCENTAGE] = round(price_diff_percent, 2)
# Add price_diff_from_max for peak_price_period
elif annotation_ctx.get("diff_key") == "price_diff_from_max":
price_diff = price_raw - annotation_ctx["ref_price"]
new_interval["price_diff_from_max"] = round(price_diff * 100, 2)
# Calculate percent difference from max price
price_diff_percent = (
((price_raw - annotation_ctx["ref_price"]) / annotation_ctx["ref_price"]) * 100
if annotation_ctx["ref_price"] != 0
else 0.0
)
new_interval["price_diff_from_max_" + PERCENTAGE] = round(price_diff_percent, 2)
return new_interval
def _annotate_period_intervals(
@ -399,7 +379,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
periods: list[list[dict]],
ref_prices: dict,
avg_price_by_day: dict,
all_prices: list[dict],
) -> list[dict]:
"""
Return flattened and annotated intervals with period info and requested properties.
@ -415,15 +394,12 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
reference_type = "ref"
if reference_type == "min":
diff_key = "price_diff_from_min"
diff_ct_key = "price_diff_from_min_minor"
diff_pct_key = "price_diff_from_min_" + PERCENTAGE
elif reference_type == "max":
diff_key = "price_diff_from_max"
diff_ct_key = "price_diff_from_max_minor"
diff_pct_key = "price_diff_from_max_" + PERCENTAGE
else:
diff_key = "price_diff"
diff_ct_key = "price_diff_minor"
diff_pct_key = "price_diff_" + PERCENTAGE
result = []
period_count = len(periods)
@ -441,10 +417,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
interval_date = interval_start.date() if interval_start else None
avg_price = avg_price_by_day.get(interval_date, 0)
ref_price = ref_prices.get(interval_date, 0)
# Calculate trailing 24-hour average for this interval
trailing_24h_avg = calculate_trailing_24h_avg(all_prices, interval_start) if interval_start else 0.0
# Calculate leading 24-hour average for this interval
leading_24h_avg = calculate_leading_24h_avg(all_prices, interval_start) if interval_start else 0.0
annotation_ctx = {
"period_start": period_start,
"period_end": period_end,
@ -459,10 +431,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
"period_idx": period_idx,
"ref_price": ref_price,
"avg_price": avg_price,
"trailing_24h_avg": trailing_24h_avg,
"leading_24h_avg": leading_24h_avg,
"diff_key": diff_key,
"diff_ct_key": diff_ct_key,
"diff_pct_key": diff_pct_key,
}
new_interval = self._annotate_single_interval(
@ -611,6 +580,61 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
# Filter out periods that are too short
return [period for period in periods if len(period) >= min_intervals]
def _merge_adjacent_periods_at_midnight(self, periods: list[list[dict]]) -> list[list[dict]]:
"""
Merge adjacent periods that meet at midnight.
When two periods are detected separately for today and tomorrow due to different
daily average prices, but they are directly adjacent at midnight, merge them into
a single period for better user experience.
Args:
periods: List of periods (each period is a list of interval dicts)
Returns:
List of periods with adjacent midnight periods merged
"""
if not periods:
return periods
merged = []
i = 0
while i < len(periods):
current_period = periods[i]
# Check if there's a next period and if they meet at midnight
if i + 1 < len(periods):
next_period = periods[i + 1]
# Get the last interval of current period and first interval of next period
last_interval = current_period[-1]
first_interval = next_period[0]
last_start = last_interval.get("interval_start")
next_start = first_interval.get("interval_start")
# Check if they are adjacent (15 minutes apart) and cross midnight
if last_start and next_start:
time_diff = next_start - last_start
last_date = last_start.date()
next_date = next_start.date()
# If they are 15 minutes apart and on different days (crossing midnight)
if time_diff == timedelta(minutes=MINUTES_PER_INTERVAL) and next_date > last_date:
# Merge the two periods
merged_period = current_period + next_period
merged.append(merged_period)
i += 2 # Skip both periods as we've merged them
continue
# If no merge happened, just add the current period
merged.append(current_period)
i += 1
return merged
def _add_interval_ends(self, periods: list[list[dict]]) -> None:
"""Add interval_end to each interval using per-interval interval_length."""
for period in periods:
@ -627,35 +651,122 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
return [
interval
for interval in result
if interval.get("interval_start") and today <= interval["interval_start"].date() <= tomorrow
if interval.get("_interval_start") and today <= interval["_interval_start"].date() <= tomorrow
]
def _find_current_or_next_interval(self, filtered_result: list[dict]) -> dict | None:
"""Find the current or next interval from the filtered list."""
now = dt_util.now()
for interval in filtered_result:
start = interval.get("interval_start")
end = interval.get("interval_end")
start = interval.get("_interval_start")
end = interval.get("_interval_end")
if start and end and start <= now < end:
return interval.copy()
for interval in filtered_result:
start = interval.get("interval_start")
start = interval.get("_interval_start")
if start and start > now:
return interval.copy()
return None
def _filter_periods_today_tomorrow(self, periods: list[list[dict]]) -> list[list[dict]]:
"""Filter periods to only those with at least one interval in today or tomorrow."""
today = dt_util.now().date()
"""
Filter periods to include those that are currently active or in the future.
This includes periods that started yesterday but are still active now (crossing midnight),
as well as periods happening today or tomorrow. We don't want to show all past periods
from yesterday, only those that extend into today.
"""
now = dt_util.now()
today = now.date()
tomorrow = today + timedelta(days=1)
return [
period
for period in periods
if any(
filtered = []
for period in periods:
if not period:
continue
# Get the period's end time (last interval's end)
last_interval = period[-1]
period_end = last_interval.get("interval_end")
# Get the period's start time (first interval's start)
first_interval = period[0]
period_start = first_interval.get("interval_start")
if not period_end or not period_start:
continue
# Include period if:
# 1. It's still active (ends in the future), OR
# 2. It has intervals today or tomorrow
if period_end > now or any(
interval.get("interval_start") and today <= interval["interval_start"].date() <= tomorrow
for interval in period
)
]
):
filtered.append(period)
return filtered
def _build_final_attributes(
self,
current_interval: dict | None,
periods_summary: list[dict],
filtered_result: list[dict],
) -> dict:
"""
Build the final attributes dictionary from period summary and current interval.
Combines period-level attributes with current interval-specific attributes,
ensuring price_diff reflects the current interval's position vs daily min/max.
"""
now = dt_util.now()
current_minute = (now.minute // 15) * 15
timestamp = now.replace(minute=current_minute, second=0, microsecond=0)
if current_interval and periods_summary:
# Find the current period in the summary based on period_start
current_period_start = current_interval.get("period_start")
current_period_summary = None
for period in periods_summary:
if period.get("start") == current_period_start:
current_period_summary = period
break
if current_period_summary:
# Copy all attributes from the period summary
attributes = {"timestamp": timestamp}
attributes.update(current_period_summary)
# Add interval-specific price_diff attributes (separate from period average)
# Shows the reference interval's position vs daily min/max:
# - If period is active: current 15-min interval vs daily min/max
# - If period hasn't started: first interval of the period vs daily min/max
# This value is what determines if an interval is part of a period (compared to flex setting)
if "price_diff_from_min" in current_interval:
attributes["interval_price_diff_from_daily_min"] = current_interval["price_diff_from_min"]
attributes["interval_price_diff_from_daily_min_%"] = current_interval.get("price_diff_from_min_%")
elif "price_diff_from_max" in current_interval:
attributes["interval_price_diff_from_daily_max"] = current_interval["price_diff_from_max"]
attributes["interval_price_diff_from_daily_max_%"] = current_interval.get("price_diff_from_max_%")
attributes["periods"] = periods_summary
attributes["intervals_count"] = len(filtered_result)
return attributes
# Fallback if current period not found in summary
return {
"timestamp": timestamp,
"periods": periods_summary,
"intervals_count": len(filtered_result),
}
# No periods found
return {
"timestamp": timestamp,
"periods": [],
"intervals_count": 0,
}
def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None:
"""
@ -704,6 +815,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
periods = self._build_periods(all_prices, price_context, reverse_sort=reverse_sort)
periods = self._filter_periods_by_min_length(periods, reverse_sort=reverse_sort)
periods = self._merge_adjacent_periods_at_midnight(periods)
self._add_interval_ends(periods)
filtered_periods = self._filter_periods_today_tomorrow(periods)
@ -713,7 +825,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
filtered_periods,
ref_prices,
avg_price_by_day,
all_prices,
)
filtered_result = self._filter_intervals_today_tomorrow(result)
@ -722,20 +833,13 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if not current_interval and filtered_result:
current_interval = filtered_result[0]
# Build attributes with current interval info but simplified period summary
attributes = {**current_interval} if current_interval else {}
# Build periods array first
periods_summary = self._build_periods_summary(filtered_result) if filtered_result else []
# Instead of full intervals list, provide period-level summary
# This reduces the attribute payload by 90%+
if filtered_result:
periods_summary = self._build_periods_summary(filtered_result)
attributes["periods"] = periods_summary
attributes["intervals_count"] = len(filtered_result)
else:
attributes["periods"] = []
attributes["intervals_count"] = 0
# Build final attributes using helper method
attributes = self._build_final_attributes(current_interval, periods_summary, filtered_result)
# Cache the result
# Cache the result (with internal fields intact)
self._cache_key = cache_key
self._period_cache = attributes
@ -743,13 +847,10 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
def _build_periods_summary(self, intervals: list[dict]) -> list[dict]:
"""
Build a summary of periods without including full interval details.
Build a summary of periods with consistent attribute structure.
Returns a list of period summaries with key information for automations:
- Period start/end times
- Duration
- Average/min/max prices
- Number of intervals
Returns a list of period summaries with the same attributes as top-level,
making the structure predictable and easy to use in automations.
"""
if not intervals:
return []
@ -764,33 +865,106 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
periods_dict[key_str] = []
periods_dict[key_str].append(interval)
# Build summary for each period
# Build summary for each period with consistent attribute names
summaries = []
for period_intervals in periods_dict.values():
if not period_intervals:
continue
first = period_intervals[0]
prices = [i["price"] for i in period_intervals if "price" in i]
# Use same attribute names as top-level for consistency
summary = {
"start": first.get("period_start"),
"end": first.get("period_end"),
"hour": first.get("hour"),
"minute": first.get("minute"),
"time": first.get("time"),
"duration_minutes": first.get("period_length_minute"),
"duration_minutes": first.get("duration_minutes"),
"periods_total": first.get("periods_total"),
"periods_remaining": first.get("periods_remaining"),
"period_position": first.get("period_position"),
"intervals_count": len(period_intervals),
"price_avg": round(sum(prices) / len(prices), 4) if prices else 0,
"price_min": round(min(prices), 4) if prices else 0,
"price_max": round(max(prices), 4) if prices else 0,
"price_avg": round(sum(prices) / len(prices), 2) if prices else 0,
"price_min": round(min(prices), 2) if prices else 0,
"price_max": round(max(prices), 2) if prices else 0,
}
# Add price_diff attributes if present
self._add_price_diff_for_period(summary, period_intervals, first)
summaries.append(summary)
return summaries
def _add_price_diff_for_period(self, summary: dict, period_intervals: list[dict], first: dict) -> None:
"""
Add price difference attributes for the period based on sensor type.
Uses the reference price (min/max) from the start day of the period to ensure
consistent comparison, especially for periods spanning midnight.
Calculates how the period's average price compares to the daily min/max,
helping to explain why the period qualifies based on flex settings.
"""
# Determine sensor type and get the reference price from the first interval
# (which represents the start of the period and its day's reference value)
if "price_diff_from_min" in first:
# Best price sensor: calculate difference from the period's start day minimum
period_start = first.get("period_start")
if not period_start:
return
# Get all prices in minor units (cents/øre) from the period
prices = [i["price"] for i in period_intervals if "price" in i]
if not prices:
return
period_avg_price = sum(prices) / len(prices)
# Extract the reference min price from first interval's calculation
# We can back-calculate it from the first interval's price and diff
first_price_minor = first.get("price")
first_diff_minor = first.get("price_diff_from_min")
if first_price_minor is not None and first_diff_minor is not None:
ref_min_price = first_price_minor - first_diff_minor
period_diff = period_avg_price - ref_min_price
# Period average price difference from daily minimum
summary["period_price_diff_from_daily_min"] = round(period_diff, 2)
if ref_min_price != 0:
period_diff_pct = (period_diff / ref_min_price) * 100
summary["period_price_diff_from_daily_min_%"] = round(period_diff_pct, 2)
elif "price_diff_from_max" in first:
# Peak price sensor: calculate difference from the period's start day maximum
period_start = first.get("period_start")
if not period_start:
return
# Get all prices in minor units (cents/øre) from the period
prices = [i["price"] for i in period_intervals if "price" in i]
if not prices:
return
period_avg_price = sum(prices) / len(prices)
# Extract the reference max price from first interval's calculation
first_price_minor = first.get("price")
first_diff_minor = first.get("price_diff_from_max")
if first_price_minor is not None and first_diff_minor is not None:
ref_max_price = first_price_minor - first_diff_minor
period_diff = period_avg_price - ref_max_price
# Period average price difference from daily maximum
summary["period_price_diff_from_daily_max"] = round(period_diff, 2)
if ref_max_price != 0:
period_diff_pct = (period_diff / ref_max_price) * 100
summary["period_price_diff_from_daily_max_%"] = round(period_diff_pct, 2)
def _get_price_hours_attributes(self, *, attribute_name: str, reverse_sort: bool) -> dict | None:
"""Get price hours attributes."""
if not self.coordinator.data:
@ -846,7 +1020,9 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if self._attribute_getter:
dynamic_attrs = self._attribute_getter()
if dynamic_attrs:
attributes.update(dynamic_attrs)
# Copy and remove internal fields before exposing to user
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
attributes.update(clean_attrs)
# Add descriptions from the custom translations file
if self.entity_description.translation_key and self.hass is not None:
@ -918,7 +1094,9 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
if self._attribute_getter:
dynamic_attrs = self._attribute_getter()
if dynamic_attrs:
attributes.update(dynamic_attrs)
# Copy and remove internal fields before exposing to user
clean_attrs = {k: v for k, v in dynamic_attrs.items() if not k.startswith("_")}
attributes.update(clean_attrs)
# Add descriptions from the cache (non-blocking)
if self.entity_description.translation_key and self.hass is not None: