From c3173a16d6918101be6c5aa3be44878502a394eb Mon Sep 17 00:00:00 2001 From: Julian Pawlowski Date: Fri, 17 Apr 2026 08:52:17 +0000 Subject: [PATCH] refactor(attributes): streamline phase type retrieval and attribute building Consolidate logic for determining current price phase and associated attributes by introducing shared helper functions. This enhances code maintainability and reduces duplication across components. Impact: Improved clarity and efficiency in price phase handling for users. --- .../tibber_prices/binary_sensor/attributes.py | 53 ++++--- .../tibber_prices/binary_sensor/core.py | 4 + .../sensor/attributes/metadata.py | 134 +++++++++++------- .../tibber_prices/sensor/attributes/timing.py | 30 ++-- .../sensor/calculators/metadata.py | 76 +++------- .../tibber_prices/sensor/definitions.py | 9 +- 6 files changed, 158 insertions(+), 148 deletions(-) diff --git a/custom_components/tibber_prices/binary_sensor/attributes.py b/custom_components/tibber_prices/binary_sensor/attributes.py index 9a30800..36d8af7 100644 --- a/custom_components/tibber_prices/binary_sensor/attributes.py +++ b/custom_components/tibber_prices/binary_sensor/attributes.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING from custom_components.tibber_prices.const import get_display_unit_factor from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets from custom_components.tibber_prices.entity_utils import add_icon_color_attribute +from custom_components.tibber_prices.sensor.attributes.metadata import _find_current_segment_in_data # Constants for price display conversion _SUBUNIT_FACTOR = 100 # Conversion factor for subunit currency (ct/øre) @@ -29,8 +30,7 @@ def get_current_phase_type(coordinator_data: dict, *, time: TibberPricesTimeServ """ Return the type of the currently active intra-day price phase. - Walks today's segments and returns the type ("rising", "falling", or "flat") - of the last segment whose start time is ≤ now. + Delegates to the shared segment finder in sensor/attributes/metadata.py. Args: coordinator_data: The coordinator's data dict. @@ -42,32 +42,39 @@ def get_current_phase_type(coordinator_data: dict, *, time: TibberPricesTimeServ """ if not coordinator_data: return None + current_index, segments = _find_current_segment_in_data(coordinator_data, time=time) + if current_index is None or segments is None: + return None + return segments[current_index].get("type") - day_patterns = coordinator_data.get("dayPatterns") - if not day_patterns: + +def get_phase_attributes(coordinator_data: dict, *, time: TibberPricesTimeService) -> dict | None: + """ + Build start/end attributes for in_*_price_phase binary sensors. + + Args: + coordinator_data: The coordinator's data dict. + time: TibberPricesTimeService instance. + + Returns: + Dict with start and end timestamps, or None if unavailable. + + """ + if not coordinator_data: + return None + current_index, segments = _find_current_segment_in_data(coordinator_data, time=time) + if current_index is None or segments is None: return None - today_data = day_patterns.get("today") - if not today_data: - return None + segment = segments[current_index] + attrs: dict = {} - segments: list[dict] | None = today_data.get("segments") - if not segments: - return None + if start := segment.get("start"): + attrs["start"] = start + if end := segment.get("end"): + attrs["end"] = end - from homeassistant.util.dt import parse_datetime # noqa: PLC0415 - - now = time.now() - current_type: str | None = None - for segment in segments: - seg_start_str: str | None = segment.get("start") - if not seg_start_str: - continue - seg_start = parse_datetime(seg_start_str) - if seg_start is not None and now >= seg_start: - current_type = segment.get("type") - - return current_type + return attrs or None def get_tomorrow_data_available_attributes( diff --git a/custom_components/tibber_prices/binary_sensor/core.py b/custom_components/tibber_prices/binary_sensor/core.py index d1a8dea..4065b73 100644 --- a/custom_components/tibber_prices/binary_sensor/core.py +++ b/custom_components/tibber_prices/binary_sensor/core.py @@ -18,6 +18,7 @@ from .attributes import ( build_async_extra_state_attributes, build_sync_extra_state_attributes, get_current_phase_type, + get_phase_attributes, get_price_intervals_attributes, get_tomorrow_data_available_attributes, ) @@ -316,6 +317,9 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity, RestoreEn if key == "tomorrow_data_available": return self._get_tomorrow_data_available_attributes() + if key in ("in_rising_price_phase", "in_falling_price_phase", "in_flat_price_phase"): + return get_phase_attributes(self.coordinator.data, time=self.coordinator.time) + return None @callback diff --git a/custom_components/tibber_prices/sensor/attributes/metadata.py b/custom_components/tibber_prices/sensor/attributes/metadata.py index c223a73..7b71e35 100644 --- a/custom_components/tibber_prices/sensor/attributes/metadata.py +++ b/custom_components/tibber_prices/sensor/attributes/metadata.py @@ -11,6 +11,54 @@ if TYPE_CHECKING: from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService +def _find_current_segment_in_data( + coordinator_data: dict, + *, + time: TibberPricesTimeService, +) -> tuple[int, list[dict[str, Any]]] | tuple[None, None]: + """ + Find the currently active segment in today's day pattern data. + + Shared helper for all phase-related attribute/value lookups. + + Args: + coordinator_data: Coordinator's data dict (must not be None). + time: TibberPricesTimeService instance. + + Returns: + Tuple of (current_index, segments) or (None, None) if unavailable. + + """ + day_patterns = coordinator_data.get("dayPatterns") + if not day_patterns: + return None, None + + today_data: dict[str, Any] | None = day_patterns.get("today") + if not today_data: + return None, None + + segments: list[dict[str, Any]] | None = today_data.get("segments") + if not segments: + return None, None + + from homeassistant.util.dt import parse_datetime # noqa: PLC0415 + + now = time.now() + current_index: int | None = None + for i, segment in enumerate(segments): + seg_start_str: str | None = segment.get("start") + if not seg_start_str: + continue + seg_start = parse_datetime(seg_start_str) + if seg_start is not None and now >= seg_start: + current_index = i + + if current_index is None: + return None, None + + return current_index, segments + + def get_current_interval_data( coordinator: TibberPricesDataUpdateCoordinator, *, @@ -131,31 +179,8 @@ def get_current_price_phase_attributes( if not coordinator.data: return None - day_patterns = coordinator.data.get("dayPatterns") - if not day_patterns: - return None - - today_data: dict[str, Any] | None = day_patterns.get("today") - if not today_data: - return None - - segments: list[dict[str, Any]] | None = today_data.get("segments") - if not segments: - return None - - from homeassistant.util.dt import parse_datetime # noqa: PLC0415 - - now = time.now() - current_index: int | None = None - for i, segment in enumerate(segments): - seg_start_str: str | None = segment.get("start") - if not seg_start_str: - continue - seg_start = parse_datetime(seg_start_str) - if seg_start is not None and now >= seg_start: - current_index = i - - if current_index is None: + current_index, segments = _find_current_segment_in_data(coordinator.data, time=time) + if current_index is None or segments is None: return None seg = segments[current_index] @@ -180,8 +205,8 @@ def get_next_price_phase_attributes( """ Build attributes for the next_price_phase sensor. - Returns details of the segment that follows the currently active one, - including its start time (useful for scheduling automations). + Returns details of the segment that follows the currently active one. + If today's current segment is the last, falls back to tomorrow's first segment. Args: coordinator: The data update coordinator. @@ -194,41 +219,42 @@ def get_next_price_phase_attributes( if not coordinator.data: return None + current_index, segments = _find_current_segment_in_data(coordinator.data, time=time) + if current_index is None or segments is None: + return None + + # Next segment in today + if current_index + 1 < len(segments): + next_seg = segments[current_index + 1] + attrs: dict[str, Any] = { + "start": next_seg.get("start"), + "end": next_seg.get("end"), + "price_min": next_seg.get("price_min"), + "price_max": next_seg.get("price_max"), + "price_mean": next_seg.get("price_mean"), + "segment_index": current_index + 1, + "segment_count": len(segments), + } + return attrs + + # Fall back to tomorrow's first segment day_patterns = coordinator.data.get("dayPatterns") if not day_patterns: return None - - today_data: dict[str, Any] | None = day_patterns.get("today") - if not today_data: + tomorrow_data = day_patterns.get("tomorrow") + if not tomorrow_data: return None - - segments: list[dict[str, Any]] | None = today_data.get("segments") - if not segments: + tomorrow_segments: list[dict[str, Any]] = tomorrow_data.get("segments", []) + if not tomorrow_segments: return None - - from homeassistant.util.dt import parse_datetime # noqa: PLC0415 - - now = time.now() - current_index: int | None = None - for i, segment in enumerate(segments): - seg_start_str: str | None = segment.get("start") - if not seg_start_str: - continue - seg_start = parse_datetime(seg_start_str) - if seg_start is not None and now >= seg_start: - current_index = i - - if current_index is None or current_index + 1 >= len(segments): - return None - - next_seg = segments[current_index + 1] - attrs: dict[str, Any] = { + next_seg = tomorrow_segments[0] + return { "start": next_seg.get("start"), "end": next_seg.get("end"), "price_min": next_seg.get("price_min"), "price_max": next_seg.get("price_max"), "price_mean": next_seg.get("price_mean"), - "segment_index": current_index + 1, - "segment_count": len(segments), + "segment_index": 0, + "segment_count": len(tomorrow_segments), + "is_tomorrow": True, } - return attrs diff --git a/custom_components/tibber_prices/sensor/attributes/timing.py b/custom_components/tibber_prices/sensor/attributes/timing.py index 841b5c1..cd6665c 100644 --- a/custom_components/tibber_prices/sensor/attributes/timing.py +++ b/custom_components/tibber_prices/sensor/attributes/timing.py @@ -12,6 +12,22 @@ if TYPE_CHECKING: # Timer #3 triggers every 30 seconds TIMER_30_SEC_BOUNDARY = 30 +# Phase timing sensor keys — allocated once at module level +_PHASE_TIMING_KEYS = frozenset( + { + "current_price_phase_end_time", + "current_price_phase_remaining_minutes", + "current_price_phase_duration", + "current_price_phase_progress", + "next_rising_phase_start_time", + "next_falling_phase_start_time", + "next_flat_phase_start_time", + "next_rising_phase_in_minutes", + "next_falling_phase_in_minutes", + "next_flat_phase_in_minutes", + } +) + def _hours_to_minutes(state_value: Any) -> int | None: """Convert hour-based state back to rounded minutes for attributes.""" @@ -34,20 +50,6 @@ def _is_timing_or_volatility_sensor(key: str) -> bool: ): return True # price phase timing sensors - _PHASE_TIMING_KEYS = frozenset( - { - "current_price_phase_end_time", - "current_price_phase_remaining_minutes", - "current_price_phase_duration", - "current_price_phase_progress", - "next_rising_phase_start_time", - "next_falling_phase_start_time", - "next_flat_phase_start_time", - "next_rising_phase_in_minutes", - "next_falling_phase_in_minutes", - "next_flat_phase_in_minutes", - } - ) return key in _PHASE_TIMING_KEYS diff --git a/custom_components/tibber_prices/sensor/calculators/metadata.py b/custom_components/tibber_prices/sensor/calculators/metadata.py index e1305bb..998a38e 100644 --- a/custom_components/tibber_prices/sensor/calculators/metadata.py +++ b/custom_components/tibber_prices/sensor/calculators/metadata.py @@ -149,80 +149,44 @@ class TibberPricesMetadataCalculator(TibberPricesBaseCalculator): "rising", "falling", or "flat", or None if data is unavailable. """ - if not self.coordinator.data: + current_index, segments = self._find_current_segment() + if current_index is None or segments is None: return None - - day_patterns = self.coordinator.data.get("dayPatterns") - if not day_patterns: - return None - - today_data = day_patterns.get("today") - if not today_data: - return None - - segments: list[dict] | None = today_data.get("segments") - if not segments: - return None - - from homeassistant.util.dt import parse_datetime # noqa: PLC0415 - - now = self.coordinator.time.now() - current_segment: dict | None = None - for segment in segments: - seg_start_str: str | None = segment.get("start") - if not seg_start_str: - continue - seg_start = parse_datetime(seg_start_str) - if seg_start is not None and now >= seg_start: - current_segment = segment - - if current_segment is None: - return None - - return current_segment.get("type") + return segments[current_index].get("type") def get_next_price_phase_value(self) -> str | None: """ Get the next intra-day price phase (rising / falling / flat). - Finds the monotone segment in today's day-pattern that starts after - the current segment and returns its type string. + Finds the monotone segment that starts after the current segment. + If the current segment is the last of today, falls back to the first + segment of tomorrow (if available). Returns: "rising", "falling", or "flat", or None if no next segment exists. """ - if not self.coordinator.data: + current_index, segments = self._find_current_segment() + if current_index is None or segments is None: return None + # Next segment in today + if current_index + 1 < len(segments): + return segments[current_index + 1].get("type") + + # Fall back to tomorrow's first segment + if not self.coordinator.data: + return None day_patterns = self.coordinator.data.get("dayPatterns") if not day_patterns: return None - - today_data = day_patterns.get("today") - if not today_data: + tomorrow_data = day_patterns.get("tomorrow") + if not tomorrow_data: return None - - segments: list[dict] | None = today_data.get("segments") - if not segments: + tomorrow_segments: list[dict] = tomorrow_data.get("segments", []) + if not tomorrow_segments: return None - - from homeassistant.util.dt import parse_datetime # noqa: PLC0415 - - now = self.coordinator.time.now() - current_index: int | None = None - for i, segment in enumerate(segments): - seg_start_str: str | None = segment.get("start") - if not seg_start_str: - continue - seg_start = parse_datetime(seg_start_str) - if seg_start is not None and now >= seg_start: - current_index = i - - if current_index is None or current_index + 1 >= len(segments): - return None - - return segments[current_index + 1].get("type") + return tomorrow_segments[0].get("type") def _find_current_segment(self) -> tuple[int, list[dict]] | tuple[None, None]: """ diff --git a/custom_components/tibber_prices/sensor/definitions.py b/custom_components/tibber_prices/sensor/definitions.py index 641213f..519092e 100644 --- a/custom_components/tibber_prices/sensor/definitions.py +++ b/custom_components/tibber_prices/sensor/definitions.py @@ -1045,6 +1045,12 @@ DAY_PATTERN_SENSORS = ( state_class=None, entity_registry_enabled_default=False, ), +) + +# 8b. PRICE PHASE SENSORS (current/next intra-day price phase classification) +# ---------------------------------------------------------------------------- + +PRICE_PHASE_SENSORS = ( SensorEntityDescription( key="current_price_phase", translation_key="current_price_phase", @@ -1065,7 +1071,7 @@ DAY_PATTERN_SENSORS = ( ), ) -# 8b. PRICE PHASE TIMING SENSORS (current phase duration/progress + next-phase-by-type) +# 8c. PRICE PHASE TIMING SENSORS (current phase duration/progress + next-phase-by-type) # ---------------------------------------------------------------------------- # # When current phase is active: @@ -1370,6 +1376,7 @@ ENTITY_DESCRIPTIONS = ( *BEST_PRICE_TIMING_SENSORS, *PEAK_PRICE_TIMING_SENSORS, *DAY_PATTERN_SENSORS, + *PRICE_PHASE_SENSORS, *PRICE_PHASE_TIMING_SENSORS, *DIAGNOSTIC_SENSORS, )