diff --git a/custom_components/tibber_prices/binary_sensor.py b/custom_components/tibber_prices/binary_sensor.py index f59ab5d..af25453 100644 --- a/custom_components/tibber_prices/binary_sensor.py +++ b/custom_components/tibber_prices/binary_sensor.py @@ -14,7 +14,7 @@ from homeassistant.const import PERCENTAGE, EntityCategory from homeassistant.util import dt as dt_util from .entity import TibberPricesEntity -from .sensor import detect_interval_granularity, find_price_data_for_interval +from .sensor import find_price_data_for_interval if TYPE_CHECKING: from collections.abc import Callable @@ -32,9 +32,8 @@ from .const import ( DEFAULT_PEAK_PRICE_FLEX, ) -MIN_TOMORROW_INTERVALS_HOURLY = 24 +MINUTES_PER_INTERVAL = 15 MIN_TOMORROW_INTERVALS_15MIN = 96 -TOMORROW_INTERVAL_COUNTS = {MIN_TOMORROW_INTERVALS_HOURLY, MIN_TOMORROW_INTERVALS_15MIN} ENTITY_DESCRIPTIONS = ( BinarySensorEntityDescription( @@ -167,7 +166,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): price_info = self.coordinator.data.get("priceInfo", {}) tomorrow_prices = price_info.get("tomorrow", []) interval_count = len(tomorrow_prices) - if interval_count in TOMORROW_INTERVAL_COUNTS: + if interval_count == MIN_TOMORROW_INTERVALS_15MIN: return True if interval_count == 0: return False @@ -182,7 +181,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): interval_count = len(tomorrow_prices) if interval_count == 0: status = "none" - elif interval_count in TOMORROW_INTERVAL_COUNTS: + elif interval_count == MIN_TOMORROW_INTERVALS_15MIN: status = "full" else: status = "partial" @@ -217,11 +216,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): now = dt_util.now() - # Detect interval granularity - interval_length = detect_interval_granularity(today_prices) - - # Find price data for current interval - current_interval_data = find_price_data_for_interval({"today": today_prices}, now, interval_length) + current_interval_data = find_price_data_for_interval({"today": today_prices}, now) if not current_interval_data: return None @@ -238,13 +233,11 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): """Annotate a single interval with all required attributes for Home Assistant UI and automations.""" interval_copy = interval.copy() interval_remaining = annotation_ctx["interval_count"] - annotation_ctx["interval_idx"] - # Extract interval-related fields for attribute ordering and clarity interval_start = interval_copy.pop("interval_start", None) interval_end = interval_copy.pop("interval_end", None) interval_hour = interval_copy.pop("interval_hour", None) interval_minute = interval_copy.pop("interval_minute", None) interval_time = interval_copy.pop("interval_time", None) - interval_length_minute = interval_copy.pop("interval_length_minute", annotation_ctx["interval_length"]) price = interval_copy.pop("price", None) new_interval = { "period_start": annotation_ctx["period_start"], @@ -253,7 +246,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): "minute": annotation_ctx["period_start_minute"], "time": annotation_ctx["period_start_time"], "period_length_minute": annotation_ctx["period_length"], - "period_remaining_minute_after_interval": interval_remaining * annotation_ctx["interval_length"], + "period_remaining_minute_after_interval": interval_remaining * MINUTES_PER_INTERVAL, "periods_total": annotation_ctx["period_count"], "periods_remaining": annotation_ctx["periods_remaining"], "period_position": annotation_ctx["period_idx"], @@ -265,10 +258,8 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): "interval_hour": interval_hour, "interval_minute": interval_minute, "interval_time": interval_time, - "interval_length_minute": interval_length_minute, "price": price, } - # Merge any extra fields from the original interval (future-proofing) new_interval.update(interval_copy) new_interval["price_minor"] = round(new_interval["price"] * 100, 2) price_diff = new_interval["price"] - annotation_ctx["ref_price"] @@ -298,7 +289,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): periods: list[list[dict]], ref_prices: dict, avg_price_by_day: dict, - interval_length: int, ) -> list[dict]: """ Return flattened and annotated intervals with period info and requested properties. @@ -333,7 +323,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): period_start_time = f"{period_start_hour:02d}:{period_start_minute:02d}" if period_start else None period_end = period[-1]["interval_end"] if period else None interval_count = len(period) - period_length = interval_count * interval_length + period_length = interval_count * MINUTES_PER_INTERVAL periods_remaining = len(periods) - period_idx for interval_idx, interval in enumerate(period, 1): interval_start = interval.get("interval_start") @@ -349,7 +339,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): "period_length": period_length, "interval_count": interval_count, "interval_idx": interval_idx, - "interval_length": interval_length, "period_count": period_count, "periods_remaining": periods_remaining, "period_idx": period_idx, @@ -366,10 +355,9 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): result.append(new_interval) return result - def _split_intervals_by_day(self, all_prices: list[dict]) -> tuple[dict, dict, dict]: - """Split intervals by day, calculate interval minutes and average price per day.""" + def _split_intervals_by_day(self, all_prices: list[dict]) -> tuple[dict, dict]: + """Split intervals by day and calculate average price per day.""" intervals_by_day: dict = {} - interval_length_by_day: dict = {} avg_price_by_day: dict = {} for price_data in all_prices: dt = dt_util.parse_datetime(price_data["startsAt"]) @@ -378,9 +366,8 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): date = dt.date() intervals_by_day.setdefault(date, []).append(price_data) for date, intervals in intervals_by_day.items(): - interval_length_by_day[date] = detect_interval_granularity(intervals) avg_price_by_day[date] = sum(float(p["total"]) for p in intervals) / len(intervals) - return intervals_by_day, interval_length_by_day, avg_price_by_day + return intervals_by_day, avg_price_by_day def _calculate_reference_prices(self, intervals_by_day: dict, *, reverse_sort: bool) -> dict: """Calculate reference prices for each day.""" @@ -397,7 +384,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): self, all_prices: list[dict], ref_prices: dict, - interval_length_by_day: dict, flex: float, *, reverse_sort: bool, @@ -417,19 +403,14 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): starts_at = dt_util.as_local(starts_at) date = starts_at.date() ref_price = ref_prices[date] - interval_length = interval_length_by_day[date] price = float(price_data["total"]) percent_diff = ((price - ref_price) / ref_price) * 100 if ref_price != 0 else 0.0 percent_diff = round(percent_diff, 2) # For best price (flex >= 0): percent_diff <= flex*100 (prices up to flex% above reference) - # For peak price (flex <= 0): percent_diff >= -flex*100 (prices up to |flex|% above reference) - in_flex = percent_diff <= flex * 100 if not reverse_sort else percent_diff >= -flex * 100 - # Split period if day or interval length changes - if ( - last_ref_date is not None - and (date != last_ref_date or interval_length != interval_length_by_day[last_ref_date]) - and current_period - ): + # For peak price (flex <= 0): percent_diff >= flex*100 (prices down to |flex|% below reference) + in_flex = percent_diff <= flex * 100 if not reverse_sort else percent_diff >= flex * 100 + # Split period if day changes + if last_ref_date is not None and date != last_ref_date and current_period: periods.append(current_period) current_period = [] last_ref_date = date @@ -439,7 +420,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): "interval_hour": starts_at.hour, "interval_minute": starts_at.minute, "interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}", - "interval_length_minute": interval_length, "price": price, "interval_start": starts_at, } @@ -458,9 +438,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): if idx + 1 < len(period): interval["interval_end"] = period[idx + 1]["interval_start"] else: - interval["interval_end"] = interval["interval_start"] + timedelta( - minutes=interval["interval_length_minute"] - ) + interval["interval_end"] = interval["interval_start"] + timedelta(minutes=MINUTES_PER_INTERVAL) def _filter_intervals_today_tomorrow(self, result: list[dict]) -> list[dict]: """Filter intervals to only include those from today and tomorrow.""" @@ -511,7 +489,7 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): if not all_prices: return None all_prices.sort(key=lambda p: p["startsAt"]) - intervals_by_day, interval_length_by_day, avg_price_by_day = self._split_intervals_by_day(all_prices) + intervals_by_day, avg_price_by_day = self._split_intervals_by_day(all_prices) ref_prices = self._calculate_reference_prices(intervals_by_day, reverse_sort=reverse_sort) flex = self._get_flex_option( CONF_BEST_PRICE_FLEX if not reverse_sort else CONF_PEAK_PRICE_FLEX, @@ -520,22 +498,21 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity): periods = self._build_periods( all_prices, ref_prices, - interval_length_by_day, flex, reverse_sort=reverse_sort, ) self._add_interval_ends(periods) # Only use periods relevant for today/tomorrow for annotation and attribute calculation filtered_periods = self._filter_periods_today_tomorrow(periods) - # Use the last interval's interval_length for period annotation (approximate) result = self._annotate_period_intervals( filtered_periods, ref_prices, avg_price_by_day, - filtered_periods[-1][-1]["interval_length_minute"] if filtered_periods and filtered_periods[-1] else 60, ) filtered_result = self._filter_intervals_today_tomorrow(result) current_interval = self._find_current_or_next_interval(filtered_result) + if not current_interval and filtered_result: + current_interval = filtered_result[0] attributes = {**current_interval} if current_interval else {} attributes["intervals"] = filtered_result return attributes diff --git a/custom_components/tibber_prices/coordinator.py b/custom_components/tibber_prices/coordinator.py index 49ad2b1..2a25814 100644 --- a/custom_components/tibber_prices/coordinator.py +++ b/custom_components/tibber_prices/coordinator.py @@ -380,16 +380,6 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]): tomorrow_prices = price_info.get("tomorrow", []) return today_prices + tomorrow_prices - def get_interval_granularity(self) -> int | None: - """Get the granularity of price intervals in minutes.""" - all_intervals = self.get_all_intervals() - if not all_intervals: - return None - - from .sensor import detect_interval_granularity as detect_granularity - - return detect_granularity(all_intervals) - async def refresh_user_data(self) -> bool: """Force refresh of user data and return True if data was updated.""" try: diff --git a/custom_components/tibber_prices/sensor.py b/custom_components/tibber_prices/sensor.py index 12997cc..6f408f1 100644 --- a/custom_components/tibber_prices/sensor.py +++ b/custom_components/tibber_prices/sensor.py @@ -344,13 +344,12 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): return None def _get_interval_price_value(self, *, interval_offset: int, in_euro: bool) -> float | None: - """Get price for the current interval or with offset, handling different interval granularities.""" + """Get price for the current interval or with offset, handling 15-minute intervals.""" if not self.coordinator.data: return None all_intervals = self.coordinator.get_all_intervals() - granularity = self.coordinator.get_interval_granularity() - if not all_intervals or granularity is None: + if not all_intervals: return None now = dt_util.now() @@ -360,7 +359,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): starts_at = interval.get("startsAt") if starts_at: ts = dt_util.parse_datetime(starts_at) - if ts and ts <= now < ts + timedelta(minutes=granularity): + if ts and ts <= now < ts + timedelta(minutes=MINUTES_PER_INTERVAL): current_idx = idx break @@ -438,12 +437,9 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): return None predicate = None if rating_type == "hourly": - price_info = self.coordinator.data.get("priceInfo", {}) - today_prices = price_info.get("today", []) - data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL def interval_predicate(entry_time: datetime) -> bool: - interval_end = entry_time + timedelta(minutes=data_granularity) + interval_end = entry_time + timedelta(minutes=MINUTES_PER_INTERVAL) return entry_time <= now < interval_end and entry_time.date() == now.date() predicate = interval_predicate @@ -548,7 +544,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): price_info = self.coordinator.data.get("priceInfo", {}) price_rating = self.coordinator.data.get("priceRating", {}) - # Determine data granularity from the current price data today_prices = price_info.get("today", []) tomorrow_prices = price_info.get("tomorrow", []) all_prices = today_prices + tomorrow_prices @@ -556,7 +551,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): if not all_prices: return None - data_granularity = detect_interval_granularity(all_prices) now = dt_util.now() # Initialize the result list @@ -581,7 +575,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): "rating_level": entry.get("level"), } - # Create a list of all future price data points for day_key in ["today", "tomorrow"]: for price_data in price_info.get(day_key, []): starts_at = dt_util.parse_datetime(price_data["startsAt"]) @@ -589,19 +582,16 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): continue starts_at = dt_util.as_local(starts_at) - interval_end = starts_at + timedelta(minutes=data_granularity) + interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL) - # Only include future intervals if starts_at > now: - # Format timestamp for rating lookup starts_at_key = starts_at.replace(second=0, microsecond=0).isoformat() - # Try to find rating data for this interval interval_rating = rating_data.get(starts_at_key) or {} future_prices.append( { - "interval_start": starts_at.isoformat(), # Renamed from starts_at to interval_start + "interval_start": starts_at.isoformat(), "interval_end": interval_end.isoformat(), "price": float(price_data["total"]), "price_cents": round(float(price_data["total"]) * 100, 2), @@ -630,16 +620,6 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): attributes["intervals"] = future_prices attributes["data_available"] = True - # Determine interval granularity for display purposes - min_intervals_for_granularity_detection = 2 - if len(future_prices) >= min_intervals_for_granularity_detection: - start1 = datetime.fromisoformat(future_prices[0]["interval_start"]) - start2 = datetime.fromisoformat(future_prices[1]["interval_start"]) - minutes_diff = int((start2 - start1).total_seconds() / 60) - attributes["interval_minutes"] = minutes_diff - else: - attributes["interval_minutes"] = MINUTES_PER_INTERVAL - # Group by hour for easier consumption in dashboards hours = {} for interval in future_prices: @@ -853,15 +833,11 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): if self.entity_description.key == "price_level" and current_interval_data and "level" in current_interval_data: self._add_price_level_attributes(attributes, current_interval_data["level"]) - # Add timestamp for next interval price sensors if self.entity_description.key in ["next_interval_price", "next_interval_price_eur"]: - # Get the next interval's data price_info = self.coordinator.data.get("priceInfo", {}) - today_prices = price_info.get("today", []) - data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL now = dt_util.now() - next_interval_time = now + timedelta(minutes=data_granularity) - next_interval_data = find_price_data_for_interval(price_info, next_interval_time, data_granularity) + next_interval_time = now + timedelta(minutes=MINUTES_PER_INTERVAL) + next_interval_data = find_price_data_for_interval(price_info, next_interval_time) attributes["timestamp"] = next_interval_data["startsAt"] if next_interval_data else None def _add_price_level_attributes(self, attributes: dict, level: str) -> None: @@ -897,9 +873,7 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): price_info = self.coordinator.data.get("priceInfo", {}) now = dt_util.now() if key == "price_rating": - today_prices = price_info.get("today", []) - data_granularity = detect_interval_granularity(today_prices) if today_prices else MINUTES_PER_INTERVAL - interval_data = find_price_data_for_interval(price_info, now, data_granularity) + interval_data = find_price_data_for_interval(price_info, now) attributes["timestamp"] = interval_data["startsAt"] if interval_data else None if hasattr(self, "_last_rating_difference") and self._last_rating_difference is not None: attributes["difference_" + PERCENTAGE] = self._last_rating_difference @@ -931,133 +905,34 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity): await self.coordinator.async_request_refresh() -def detect_interval_granularity(price_data: list[dict]) -> int: +def find_price_data_for_interval(price_info: Any, target_time: datetime) -> dict | None: """ - Detect the granularity of price intervals in minutes. - - Args: - price_data: List of price data points with startsAt timestamps - - Returns: - Minutes per interval (e.g., 60 for hourly, 15 for 15-minute intervals) - - """ - min_datapoints_for_granularity = 2 - if not price_data or len(price_data) < min_datapoints_for_granularity: - return MINUTES_PER_INTERVAL # Default to target value - - # Sort data points by timestamp - sorted_data = sorted(price_data, key=lambda x: x["startsAt"]) - - # Calculate the time differences between consecutive timestamps - intervals = [] - for i in range(1, min(10, len(sorted_data))): # Sample up to 10 intervals - start_time_1 = dt_util.parse_datetime(sorted_data[i - 1]["startsAt"]) - start_time_2 = dt_util.parse_datetime(sorted_data[i]["startsAt"]) - - if start_time_1 and start_time_2: - diff_minutes = (start_time_2 - start_time_1).total_seconds() / 60 - intervals.append(round(diff_minutes)) - - # If no valid intervals found, return default - if not intervals: - return MINUTES_PER_INTERVAL - - # Return the most common interval (mode) - return max(set(intervals), key=intervals.count) - - -def get_interval_for_timestamp(timestamp: datetime, granularity: int) -> int: - """ - Calculate the interval index within an hour for a given timestamp. - - Args: - timestamp: The timestamp to calculate interval for - granularity: Minutes per interval - - Returns: - Interval index (0-based) within the hour - - """ - # Calculate which interval this timestamp falls into - intervals_per_hour = 60 // granularity - return (timestamp.minute // granularity) % intervals_per_hour - - -def _match_hourly_price_data(day_prices: list, target_time: datetime) -> dict | None: - """Match price data for hourly granularity.""" - for price_data in day_prices: - starts_at = dt_util.parse_datetime(price_data["startsAt"]) - if starts_at is None: - continue - - starts_at = dt_util.as_local(starts_at) - if starts_at.hour == target_time.hour and starts_at.date() == target_time.date(): - return price_data - return None - - -def _match_granular_price_data(day_prices: list, target_time: datetime, data_granularity: int) -> dict | None: - """Match price data for sub-hourly granularity.""" - for price_data in day_prices: - starts_at = dt_util.parse_datetime(price_data["startsAt"]) - if starts_at is None: - continue - - starts_at = dt_util.as_local(starts_at) - interval_end = starts_at + timedelta(minutes=data_granularity) - # Check if target time falls within this interval - if starts_at <= target_time < interval_end and starts_at.date() == target_time.date(): - return price_data - return None - - -def find_price_data_for_interval( - price_info: Any, target_time: datetime, data_granularity: int | None = None -) -> dict | None: - """ - Find the price data for a specific timestamp, handling different interval granularities. + Find the price data for a specific 15-minute interval timestamp. Args: price_info: The price info dictionary from Tibber API target_time: The target timestamp to find price data for - data_granularity: Override detected granularity with this value (minutes) Returns: Price data dict if found, None otherwise """ - # Determine which day's data to search day_key = "tomorrow" if target_time.date() > dt_util.now().date() else "today" search_days = [day_key, "tomorrow" if day_key == "today" else "today"] - # Try to find price data in today or tomorrow for search_day in search_days: day_prices = price_info.get(search_day, []) if not day_prices: continue - # Detect the granularity if not provided - if data_granularity is None: - data_granularity = detect_interval_granularity(day_prices) + for price_data in day_prices: + starts_at = dt_util.parse_datetime(price_data["startsAt"]) + if starts_at is None: + continue - # Check for a match with appropriate granularity - if data_granularity >= MINUTES_PER_INTERVAL * 4: # 60 minutes = hourly - result = _match_hourly_price_data(day_prices, target_time) - else: - result = _match_granular_price_data(day_prices, target_time, data_granularity) - - if result: - return result - - # If not found and we have sub-hourly granularity, try to fall back to hourly data - if data_granularity is not None and data_granularity < MINUTES_PER_INTERVAL * 4: - hour_start = target_time.replace(minute=0, second=0, microsecond=0) - - for search_day in search_days: - day_prices = price_info.get(search_day, []) - result = _match_hourly_price_data(day_prices, hour_start) - if result: - return result + starts_at = dt_util.as_local(starts_at) + interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL) + if starts_at <= target_time < interval_end and starts_at.date() == target_time.date(): + return price_data return None diff --git a/custom_components/tibber_prices/services.py b/custom_components/tibber_prices/services.py index 95948c8..b8ea9b8 100644 --- a/custom_components/tibber_prices/services.py +++ b/custom_components/tibber_prices/services.py @@ -147,6 +147,8 @@ async def _get_price(call: ServiceCall) -> dict[str, Any]: for interval in merged: if "previous_end_time" in interval: del interval["previous_end_time"] + if "start_dt" in interval: + del interval["start_dt"] response_ctx = PriceResponseContext( price_stats=price_stats, diff --git a/tests/test_coordinator_basic.py b/tests/test_coordinator_basic.py index 8097faf..aabfd46 100644 --- a/tests/test_coordinator_basic.py +++ b/tests/test_coordinator_basic.py @@ -79,8 +79,3 @@ class TestBasicCoordinator: """Test getting all intervals when no data is available.""" intervals = coordinator.get_all_intervals() assert intervals == [] - - def test_get_interval_granularity(self, coordinator): - """Test getting interval granularity.""" - granularity = coordinator.get_interval_granularity() - assert granularity is None