diff --git a/custom_components/tibber_prices/binary_sensor.py b/custom_components/tibber_prices/binary_sensor.py index d6ac79a..0887e08 100644 --- a/custom_components/tibber_prices/binary_sensor.py +++ b/custom_components/tibber_prices/binary_sensor.py @@ -170,10 +170,10 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): now = dt_util.now() # Detect interval granularity - interval_minutes = detect_interval_granularity(today_prices) + interval_length = detect_interval_granularity(today_prices) # Find price data for current interval - current_interval_data = find_price_data_for_interval({"today": today_prices}, now, interval_minutes) + current_interval_data = find_price_data_for_interval({"today": today_prices}, now, interval_length) if not current_interval_data: return None @@ -182,15 +182,80 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): prices.sort() return prices, float(current_interval_data["total"]) + def _annotate_single_interval( + self, + interval: dict, + annotation_ctx: dict, + ) -> dict: + """Annotate a single interval with all required attributes.""" + interval_copy = interval.copy() + interval_remaining = annotation_ctx["interval_count"] - annotation_ctx["interval_idx"] + # Extract all interval-related fields first + interval_start = interval_copy.pop("interval_start", None) + interval_end = interval_copy.pop("interval_end", None) + interval_hour = interval_copy.pop("interval_hour", None) + interval_minute = interval_copy.pop("interval_minute", None) + interval_time = interval_copy.pop("interval_time", None) + interval_length_minute = interval_copy.pop("interval_length_minute", annotation_ctx["interval_length"]) + # Extract price + price = interval_copy.pop("price", None) + new_interval = { + "period_start": annotation_ctx["period_start"], + "period_end": annotation_ctx["period_end"], + "hour": annotation_ctx["period_start_hour"], + "minute": annotation_ctx["period_start_minute"], + "time": annotation_ctx["period_start_time"], + "period_length_minute": annotation_ctx["period_length"], + "period_remaining_minute_after_interval": interval_remaining * annotation_ctx["interval_length"], + "periods_total": annotation_ctx["period_count"], + "periods_remaining": annotation_ctx["periods_remaining"], + "period_position": annotation_ctx["period_idx"], + "interval_total": annotation_ctx["interval_count"], + "interval_remaining": interval_remaining, + "interval_position": annotation_ctx["interval_idx"], + "interval_start": interval_start, + "interval_end": interval_end, + "interval_hour": interval_hour, + "interval_minute": interval_minute, + "interval_time": interval_time, + "interval_length_minute": interval_length_minute, + "price": price, + } + # Add any remaining fields (should be only extra/unknowns) + new_interval.update(interval_copy) + new_interval["price_ct"] = round(new_interval["price"] * 100, 2) + price_diff = new_interval["price"] - annotation_ctx["ref_price"] + new_interval[annotation_ctx["diff_key"]] = round(price_diff, 4) + new_interval[annotation_ctx["diff_ct_key"]] = round(price_diff * 100, 2) + price_diff_percent = ( + ((new_interval["price"] - annotation_ctx["ref_price"]) / annotation_ctx["ref_price"]) * 100 + if annotation_ctx["ref_price"] != 0 + else 0.0 + ) + new_interval[annotation_ctx["diff_pct_key"]] = round(price_diff_percent, 2) + avg_diff = new_interval["price"] - annotation_ctx["avg_price"] + new_interval["price_diff_from_avg"] = round(avg_diff, 4) + new_interval["price_diff_from_avg_ct"] = round(avg_diff * 100, 2) + avg_diff_percent = ( + ((new_interval["price"] - annotation_ctx["avg_price"]) / annotation_ctx["avg_price"]) * 100 + if annotation_ctx["avg_price"] != 0 + else 0.0 + ) + new_interval["price_diff_from_avg_" + PERCENTAGE] = round(avg_diff_percent, 2) + return new_interval + def _annotate_period_intervals( self, periods: list[list[dict]], - ref_price: float, - avg_price: float, - interval_minutes: int, + ref_prices: dict, + avg_price_by_day: dict, + interval_length: int, ) -> list[dict]: - """Return flattened and annotated intervals with period info and requested properties.""" - # Determine reference type for naming + """ + Return flattened and annotated intervals with period info and requested properties. + + Uses the correct reference price for each interval's date. + """ reference_type = None if self.entity_description.key == "best_price_period": reference_type = "min" @@ -198,7 +263,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): reference_type = "max" else: reference_type = "ref" - # Set attribute name suffixes if reference_type == "min": diff_key = "price_diff_from_min" diff_ct_key = "price_diff_from_min_ct" @@ -213,123 +277,121 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): diff_pct_key = "price_diff_" + PERCENTAGE result = [] period_count = len(periods) - for idx, period in enumerate(periods, 1): + for period_idx, period in enumerate(periods, 1): period_start = period[0]["interval_start"] if period else None period_start_hour = period_start.hour if period_start else None period_start_minute = period_start.minute if period_start else None period_start_time = f"{period_start_hour:02d}:{period_start_minute:02d}" if period_start else None period_end = period[-1]["interval_end"] if period else None interval_count = len(period) - period_length = interval_count * interval_minutes - periods_remaining = len(periods) - idx + period_length = interval_count * interval_length + periods_remaining = len(periods) - period_idx for interval_idx, interval in enumerate(period, 1): - interval_copy = interval.copy() - interval_remaining = interval_count - interval_idx - # Compose new dict with period-related keys first, then interval timing, then price info - new_interval = { + interval_start = interval.get("interval_start") + interval_date = interval_start.date() if interval_start else None + avg_price = avg_price_by_day.get(interval_date, 0) + ref_price = ref_prices.get(interval_date, 0) + annotation_ctx = { "period_start": period_start, "period_end": period_end, - "hour": period_start_hour, - "minute": period_start_minute, - "time": period_start_time, - "period_length_minute": period_length, - "period_remaining_minute_after_interval": interval_remaining * interval_minutes, - "periods_total": period_count, + "period_start_hour": period_start_hour, + "period_start_minute": period_start_minute, + "period_start_time": period_start_time, + "period_length": period_length, + "interval_count": interval_count, + "interval_idx": interval_idx, + "interval_length": interval_length, + "period_count": period_count, "periods_remaining": periods_remaining, - "interval_total": interval_count, - "interval_remaining": interval_remaining, - "interval_position": interval_idx, + "period_idx": period_idx, + "ref_price": ref_price, + "avg_price": avg_price, + "diff_key": diff_key, + "diff_ct_key": diff_ct_key, + "diff_pct_key": diff_pct_key, } - # Add interval timing - new_interval["interval_start"] = interval_copy.pop("interval_start", None) - new_interval["interval_end"] = interval_copy.pop("interval_end", None) - # Add hour, minute, time, price if present in interval_copy - for k in ("interval_hour", "interval_minute", "interval_time", "price"): - if k in interval_copy: - new_interval[k] = interval_copy.pop(k) - # Add the rest of the interval info (e.g. price_ct, price_difference_*, etc.) - new_interval.update(interval_copy) - new_interval["price_ct"] = round(new_interval["price"] * 100, 2) - price_diff = new_interval["price"] - ref_price - new_interval[diff_key] = round(price_diff, 4) - new_interval[diff_ct_key] = round(price_diff * 100, 2) - price_diff_percent = ((new_interval["price"] - ref_price) / ref_price) * 100 if ref_price != 0 else 0.0 - new_interval[diff_pct_key] = round(price_diff_percent, 2) - # Add difference to average price of the day (avg_price is now passed in) - avg_diff = new_interval["price"] - avg_price - new_interval["price_diff_from_avg"] = round(avg_diff, 4) - new_interval["price_diff_from_avg_ct"] = round(avg_diff * 100, 2) - avg_diff_percent = ((new_interval["price"] - avg_price) / avg_price) * 100 if avg_price != 0 else 0.0 - new_interval["price_diff_from_avg_" + PERCENTAGE] = round(avg_diff_percent, 2) + new_interval = self._annotate_single_interval( + interval, + annotation_ctx, + ) result.append(new_interval) return result - def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None: + def _split_intervals_by_day(self, all_prices: list[dict]) -> tuple[dict, dict, dict]: + """Split intervals by day, calculate interval minutes and average price per day.""" + intervals_by_day: dict = {} + interval_length_by_day: dict = {} + avg_price_by_day: dict = {} + for price_data in all_prices: + dt = dt_util.parse_datetime(price_data["startsAt"]) + if dt is None: + continue + date = dt.date() + intervals_by_day.setdefault(date, []).append(price_data) + for date, intervals in intervals_by_day.items(): + interval_length_by_day[date] = detect_interval_granularity(intervals) + avg_price_by_day[date] = sum(float(p["total"]) for p in intervals) / len(intervals) + return intervals_by_day, interval_length_by_day, avg_price_by_day + + def _calculate_reference_prices(self, intervals_by_day: dict, *, reverse_sort: bool) -> dict: + """Calculate reference prices for each day.""" + ref_prices: dict = {} + for date, intervals in intervals_by_day.items(): + prices = [float(p["total"]) for p in intervals] + if reverse_sort is False: + ref_prices[date] = min(prices) + else: + ref_prices[date] = max(prices) + return ref_prices + + def _build_periods( + self, + all_prices: list[dict], + ref_prices: dict, + interval_length_by_day: dict, + flex: float, + *, + reverse_sort: bool, + ) -> list[list[dict]]: """ - Get price interval attributes with support for 15-minute intervals and period grouping. - - Args: - reverse_sort: Whether to sort prices in reverse (high to low) - - Returns: - Dictionary with interval data or None if not available + Build periods, allowing periods to cross midnight (day boundary). + Strictly enforce flex threshold by percent diff, matching attribute calculation. """ - if not self.coordinator.data: - return None - - price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] - today_prices = price_info.get("today", []) - - if not today_prices: - return None - - interval_minutes = detect_interval_granularity(today_prices) - - # Use entity type to determine flex and logic, but always use 'price_intervals' as attribute name - if reverse_sort is False: # best_price_period entity - flex = self._get_flex_option(CONF_BEST_PRICE_FLEX, DEFAULT_BEST_PRICE_FLEX) - prices = [float(p["total"]) for p in today_prices] - min_price = min(prices) - - def in_range(price: float) -> bool: - return price <= min_price * (1 + flex) - - ref_price = min_price - elif reverse_sort is True: # peak_price_period entity - flex = self._get_flex_option(CONF_PEAK_PRICE_FLEX, DEFAULT_PEAK_PRICE_FLEX) - prices = [float(p["total"]) for p in today_prices] - max_price = max(prices) - - def in_range(price: float) -> bool: - return price >= max_price * (1 - flex) - - ref_price = max_price - else: - return None - - # Calculate average price for the day (all intervals, not just periods) - all_prices = [float(p["total"]) for p in today_prices] - avg_price = sum(all_prices) / len(all_prices) if all_prices else 0.0 - - # Build intervals with period grouping - periods = [] - current_period = [] - for price_data in today_prices: + periods: list[list[dict]] = [] + current_period: list[dict] = [] + last_ref_date = None + for price_data in all_prices: starts_at = dt_util.parse_datetime(price_data["startsAt"]) if starts_at is None: continue starts_at = dt_util.as_local(starts_at) + date = starts_at.date() + ref_price = ref_prices[date] + interval_length = interval_length_by_day[date] price = float(price_data["total"]) - if in_range(price): + percent_diff = ((price - ref_price) / ref_price) * 100 if ref_price != 0 else 0.0 + percent_diff = round(percent_diff, 2) + # For best price: percent_diff <= flex*100; for peak: percent_diff >= -flex*100 + in_flex = percent_diff <= flex * 100 if not reverse_sort else percent_diff >= -flex * 100 + # Split period if day or interval length changes + if ( + last_ref_date is not None + and (date != last_ref_date or interval_length != interval_length_by_day[last_ref_date]) + and current_period + ): + periods.append(current_period) + current_period = [] + last_ref_date = date + if in_flex: current_period.append( { "interval_hour": starts_at.hour, "interval_minute": starts_at.minute, "interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}", + "interval_length_minute": interval_length, "price": price, "interval_start": starts_at, - # interval_end will be filled later } ) elif current_period: @@ -337,37 +399,95 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): current_period = [] if current_period: periods.append(current_period) + return periods - # Add interval_end to each interval (next interval's start or None) + def _add_interval_ends(self, periods: list[list[dict]]) -> None: + """Add interval_end to each interval using per-interval interval_length.""" for period in periods: for idx, interval in enumerate(period): if idx + 1 < len(period): interval["interval_end"] = period[idx + 1]["interval_start"] else: - # Try to estimate end as start + interval_minutes - interval["interval_end"] = interval["interval_start"] + timedelta(minutes=interval_minutes) + interval["interval_end"] = interval["interval_start"] + timedelta( + minutes=interval["interval_length_minute"] + ) - result = self._annotate_period_intervals(periods, ref_price, avg_price, interval_minutes) + def _filter_intervals_today_tomorrow(self, result: list[dict]) -> list[dict]: + """Filter intervals to only include those from today and tomorrow.""" + today = dt_util.now().date() + tomorrow = today + timedelta(days=1) + return [ + interval + for interval in result + if interval.get("interval_start") and today <= interval["interval_start"].date() <= tomorrow + ] - # Find the current or next interval (by time) from the annotated result + def _find_current_or_next_interval(self, filtered_result: list[dict]) -> dict | None: + """Find the current or next interval from the filtered list.""" now = dt_util.now() - current_interval = None - for interval in result: + for interval in filtered_result: start = interval.get("interval_start") end = interval.get("interval_end") if start and end and start <= now < end: - current_interval = interval.copy() - break - else: - # If no current interval, show the next period's first interval (if available) - for interval in result: - start = interval.get("interval_start") - if start and start > now: - current_interval = interval.copy() - break + return interval.copy() + for interval in filtered_result: + start = interval.get("interval_start") + if start and start > now: + return interval.copy() + return None + def _filter_periods_today_tomorrow(self, periods: list[list[dict]]) -> list[list[dict]]: + """Filter periods to only those with at least one interval in today or tomorrow.""" + today = dt_util.now().date() + tomorrow = today + timedelta(days=1) + return [ + period + for period in periods + if any( + interval.get("interval_start") and today <= interval["interval_start"].date() <= tomorrow + for interval in period + ) + ] + + def _get_price_intervals_attributes(self, *, reverse_sort: bool) -> dict | None: + """Get price interval attributes with support for 15-minute intervals and period grouping.""" + if not self.coordinator.data: + return None + price_info = self.coordinator.data["data"]["viewer"]["homes"][0]["currentSubscription"]["priceInfo"] + yesterday_prices = price_info.get("yesterday", []) + today_prices = price_info.get("today", []) + tomorrow_prices = price_info.get("tomorrow", []) + all_prices = yesterday_prices + today_prices + tomorrow_prices + if not all_prices: + return None + all_prices.sort(key=lambda p: p["startsAt"]) + intervals_by_day, interval_length_by_day, avg_price_by_day = self._split_intervals_by_day(all_prices) + ref_prices = self._calculate_reference_prices(intervals_by_day, reverse_sort=reverse_sort) + flex = self._get_flex_option( + CONF_BEST_PRICE_FLEX if not reverse_sort else CONF_PEAK_PRICE_FLEX, + DEFAULT_BEST_PRICE_FLEX if not reverse_sort else DEFAULT_PEAK_PRICE_FLEX, + ) + periods = self._build_periods( + all_prices, + ref_prices, + interval_length_by_day, + flex, + reverse_sort=reverse_sort, + ) + self._add_interval_ends(periods) + # Only use periods relevant for today/tomorrow for annotation and attribute calculation + filtered_periods = self._filter_periods_today_tomorrow(periods) + # Use the last interval's interval_length for period annotation (approximate) + result = self._annotate_period_intervals( + filtered_periods, + ref_prices, + avg_price_by_day, + filtered_periods[-1][-1]["interval_length_minute"] if filtered_periods and filtered_periods[-1] else 60, + ) + filtered_result = self._filter_intervals_today_tomorrow(result) + current_interval = self._find_current_or_next_interval(filtered_result) attributes = {**current_interval} if current_interval else {} - attributes["intervals"] = result + attributes["intervals"] = filtered_result return attributes def _get_price_hours_attributes(self, *, attribute_name: str, reverse_sort: bool) -> dict | None: