refactor(attributes): streamline phase type retrieval and attribute building

Consolidate logic for determining current price phase and associated attributes by introducing shared helper functions. This enhances code maintainability and reduces duplication across components.

Impact: Improved clarity and efficiency in price phase handling for users.
This commit is contained in:
Julian Pawlowski 2026-04-17 08:52:17 +00:00
parent 752a0c5dbc
commit c3173a16d6
6 changed files with 158 additions and 148 deletions

View file

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING
from custom_components.tibber_prices.const import get_display_unit_factor from custom_components.tibber_prices.const import get_display_unit_factor
from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets from custom_components.tibber_prices.coordinator.helpers import get_intervals_for_day_offsets
from custom_components.tibber_prices.entity_utils import add_icon_color_attribute from custom_components.tibber_prices.entity_utils import add_icon_color_attribute
from custom_components.tibber_prices.sensor.attributes.metadata import _find_current_segment_in_data
# Constants for price display conversion # Constants for price display conversion
_SUBUNIT_FACTOR = 100 # Conversion factor for subunit currency (ct/øre) _SUBUNIT_FACTOR = 100 # Conversion factor for subunit currency (ct/øre)
@ -29,8 +30,7 @@ def get_current_phase_type(coordinator_data: dict, *, time: TibberPricesTimeServ
""" """
Return the type of the currently active intra-day price phase. Return the type of the currently active intra-day price phase.
Walks today's segments and returns the type ("rising", "falling", or "flat") Delegates to the shared segment finder in sensor/attributes/metadata.py.
of the last segment whose start time is now.
Args: Args:
coordinator_data: The coordinator's data dict. coordinator_data: The coordinator's data dict.
@ -42,32 +42,39 @@ def get_current_phase_type(coordinator_data: dict, *, time: TibberPricesTimeServ
""" """
if not coordinator_data: if not coordinator_data:
return None return None
current_index, segments = _find_current_segment_in_data(coordinator_data, time=time)
if current_index is None or segments is None:
return None
return segments[current_index].get("type")
day_patterns = coordinator_data.get("dayPatterns")
if not day_patterns: def get_phase_attributes(coordinator_data: dict, *, time: TibberPricesTimeService) -> dict | None:
"""
Build start/end attributes for in_*_price_phase binary sensors.
Args:
coordinator_data: The coordinator's data dict.
time: TibberPricesTimeService instance.
Returns:
Dict with start and end timestamps, or None if unavailable.
"""
if not coordinator_data:
return None
current_index, segments = _find_current_segment_in_data(coordinator_data, time=time)
if current_index is None or segments is None:
return None return None
today_data = day_patterns.get("today") segment = segments[current_index]
if not today_data: attrs: dict = {}
return None
segments: list[dict] | None = today_data.get("segments") if start := segment.get("start"):
if not segments: attrs["start"] = start
return None if end := segment.get("end"):
attrs["end"] = end
from homeassistant.util.dt import parse_datetime # noqa: PLC0415 return attrs or None
now = time.now()
current_type: str | None = None
for segment in segments:
seg_start_str: str | None = segment.get("start")
if not seg_start_str:
continue
seg_start = parse_datetime(seg_start_str)
if seg_start is not None and now >= seg_start:
current_type = segment.get("type")
return current_type
def get_tomorrow_data_available_attributes( def get_tomorrow_data_available_attributes(

View file

@ -18,6 +18,7 @@ from .attributes import (
build_async_extra_state_attributes, build_async_extra_state_attributes,
build_sync_extra_state_attributes, build_sync_extra_state_attributes,
get_current_phase_type, get_current_phase_type,
get_phase_attributes,
get_price_intervals_attributes, get_price_intervals_attributes,
get_tomorrow_data_available_attributes, get_tomorrow_data_available_attributes,
) )
@ -316,6 +317,9 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity, RestoreEn
if key == "tomorrow_data_available": if key == "tomorrow_data_available":
return self._get_tomorrow_data_available_attributes() return self._get_tomorrow_data_available_attributes()
if key in ("in_rising_price_phase", "in_falling_price_phase", "in_flat_price_phase"):
return get_phase_attributes(self.coordinator.data, time=self.coordinator.time)
return None return None
@callback @callback

View file

@ -11,6 +11,54 @@ if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
def _find_current_segment_in_data(
coordinator_data: dict,
*,
time: TibberPricesTimeService,
) -> tuple[int, list[dict[str, Any]]] | tuple[None, None]:
"""
Find the currently active segment in today's day pattern data.
Shared helper for all phase-related attribute/value lookups.
Args:
coordinator_data: Coordinator's data dict (must not be None).
time: TibberPricesTimeService instance.
Returns:
Tuple of (current_index, segments) or (None, None) if unavailable.
"""
day_patterns = coordinator_data.get("dayPatterns")
if not day_patterns:
return None, None
today_data: dict[str, Any] | None = day_patterns.get("today")
if not today_data:
return None, None
segments: list[dict[str, Any]] | None = today_data.get("segments")
if not segments:
return None, None
from homeassistant.util.dt import parse_datetime # noqa: PLC0415
now = time.now()
current_index: int | None = None
for i, segment in enumerate(segments):
seg_start_str: str | None = segment.get("start")
if not seg_start_str:
continue
seg_start = parse_datetime(seg_start_str)
if seg_start is not None and now >= seg_start:
current_index = i
if current_index is None:
return None, None
return current_index, segments
def get_current_interval_data( def get_current_interval_data(
coordinator: TibberPricesDataUpdateCoordinator, coordinator: TibberPricesDataUpdateCoordinator,
*, *,
@ -131,31 +179,8 @@ def get_current_price_phase_attributes(
if not coordinator.data: if not coordinator.data:
return None return None
day_patterns = coordinator.data.get("dayPatterns") current_index, segments = _find_current_segment_in_data(coordinator.data, time=time)
if not day_patterns: if current_index is None or segments is None:
return None
today_data: dict[str, Any] | None = day_patterns.get("today")
if not today_data:
return None
segments: list[dict[str, Any]] | None = today_data.get("segments")
if not segments:
return None
from homeassistant.util.dt import parse_datetime # noqa: PLC0415
now = time.now()
current_index: int | None = None
for i, segment in enumerate(segments):
seg_start_str: str | None = segment.get("start")
if not seg_start_str:
continue
seg_start = parse_datetime(seg_start_str)
if seg_start is not None and now >= seg_start:
current_index = i
if current_index is None:
return None return None
seg = segments[current_index] seg = segments[current_index]
@ -180,8 +205,8 @@ def get_next_price_phase_attributes(
""" """
Build attributes for the next_price_phase sensor. Build attributes for the next_price_phase sensor.
Returns details of the segment that follows the currently active one, Returns details of the segment that follows the currently active one.
including its start time (useful for scheduling automations). If today's current segment is the last, falls back to tomorrow's first segment.
Args: Args:
coordinator: The data update coordinator. coordinator: The data update coordinator.
@ -194,41 +219,42 @@ def get_next_price_phase_attributes(
if not coordinator.data: if not coordinator.data:
return None return None
current_index, segments = _find_current_segment_in_data(coordinator.data, time=time)
if current_index is None or segments is None:
return None
# Next segment in today
if current_index + 1 < len(segments):
next_seg = segments[current_index + 1]
attrs: dict[str, Any] = {
"start": next_seg.get("start"),
"end": next_seg.get("end"),
"price_min": next_seg.get("price_min"),
"price_max": next_seg.get("price_max"),
"price_mean": next_seg.get("price_mean"),
"segment_index": current_index + 1,
"segment_count": len(segments),
}
return attrs
# Fall back to tomorrow's first segment
day_patterns = coordinator.data.get("dayPatterns") day_patterns = coordinator.data.get("dayPatterns")
if not day_patterns: if not day_patterns:
return None return None
tomorrow_data = day_patterns.get("tomorrow")
today_data: dict[str, Any] | None = day_patterns.get("today") if not tomorrow_data:
if not today_data:
return None return None
tomorrow_segments: list[dict[str, Any]] = tomorrow_data.get("segments", [])
segments: list[dict[str, Any]] | None = today_data.get("segments") if not tomorrow_segments:
if not segments:
return None return None
next_seg = tomorrow_segments[0]
from homeassistant.util.dt import parse_datetime # noqa: PLC0415 return {
now = time.now()
current_index: int | None = None
for i, segment in enumerate(segments):
seg_start_str: str | None = segment.get("start")
if not seg_start_str:
continue
seg_start = parse_datetime(seg_start_str)
if seg_start is not None and now >= seg_start:
current_index = i
if current_index is None or current_index + 1 >= len(segments):
return None
next_seg = segments[current_index + 1]
attrs: dict[str, Any] = {
"start": next_seg.get("start"), "start": next_seg.get("start"),
"end": next_seg.get("end"), "end": next_seg.get("end"),
"price_min": next_seg.get("price_min"), "price_min": next_seg.get("price_min"),
"price_max": next_seg.get("price_max"), "price_max": next_seg.get("price_max"),
"price_mean": next_seg.get("price_mean"), "price_mean": next_seg.get("price_mean"),
"segment_index": current_index + 1, "segment_index": 0,
"segment_count": len(segments), "segment_count": len(tomorrow_segments),
"is_tomorrow": True,
} }
return attrs

View file

@ -12,6 +12,22 @@ if TYPE_CHECKING:
# Timer #3 triggers every 30 seconds # Timer #3 triggers every 30 seconds
TIMER_30_SEC_BOUNDARY = 30 TIMER_30_SEC_BOUNDARY = 30
# Phase timing sensor keys — allocated once at module level
_PHASE_TIMING_KEYS = frozenset(
{
"current_price_phase_end_time",
"current_price_phase_remaining_minutes",
"current_price_phase_duration",
"current_price_phase_progress",
"next_rising_phase_start_time",
"next_falling_phase_start_time",
"next_flat_phase_start_time",
"next_rising_phase_in_minutes",
"next_falling_phase_in_minutes",
"next_flat_phase_in_minutes",
}
)
def _hours_to_minutes(state_value: Any) -> int | None: def _hours_to_minutes(state_value: Any) -> int | None:
"""Convert hour-based state back to rounded minutes for attributes.""" """Convert hour-based state back to rounded minutes for attributes."""
@ -34,20 +50,6 @@ def _is_timing_or_volatility_sensor(key: str) -> bool:
): ):
return True return True
# price phase timing sensors # price phase timing sensors
_PHASE_TIMING_KEYS = frozenset(
{
"current_price_phase_end_time",
"current_price_phase_remaining_minutes",
"current_price_phase_duration",
"current_price_phase_progress",
"next_rising_phase_start_time",
"next_falling_phase_start_time",
"next_flat_phase_start_time",
"next_rising_phase_in_minutes",
"next_falling_phase_in_minutes",
"next_flat_phase_in_minutes",
}
)
return key in _PHASE_TIMING_KEYS return key in _PHASE_TIMING_KEYS

View file

@ -149,80 +149,44 @@ class TibberPricesMetadataCalculator(TibberPricesBaseCalculator):
"rising", "falling", or "flat", or None if data is unavailable. "rising", "falling", or "flat", or None if data is unavailable.
""" """
if not self.coordinator.data: current_index, segments = self._find_current_segment()
if current_index is None or segments is None:
return None return None
return segments[current_index].get("type")
day_patterns = self.coordinator.data.get("dayPatterns")
if not day_patterns:
return None
today_data = day_patterns.get("today")
if not today_data:
return None
segments: list[dict] | None = today_data.get("segments")
if not segments:
return None
from homeassistant.util.dt import parse_datetime # noqa: PLC0415
now = self.coordinator.time.now()
current_segment: dict | None = None
for segment in segments:
seg_start_str: str | None = segment.get("start")
if not seg_start_str:
continue
seg_start = parse_datetime(seg_start_str)
if seg_start is not None and now >= seg_start:
current_segment = segment
if current_segment is None:
return None
return current_segment.get("type")
def get_next_price_phase_value(self) -> str | None: def get_next_price_phase_value(self) -> str | None:
""" """
Get the next intra-day price phase (rising / falling / flat). Get the next intra-day price phase (rising / falling / flat).
Finds the monotone segment in today's day-pattern that starts after Finds the monotone segment that starts after the current segment.
the current segment and returns its type string. If the current segment is the last of today, falls back to the first
segment of tomorrow (if available).
Returns: Returns:
"rising", "falling", or "flat", or None if no next segment exists. "rising", "falling", or "flat", or None if no next segment exists.
""" """
if not self.coordinator.data: current_index, segments = self._find_current_segment()
if current_index is None or segments is None:
return None return None
# Next segment in today
if current_index + 1 < len(segments):
return segments[current_index + 1].get("type")
# Fall back to tomorrow's first segment
if not self.coordinator.data:
return None
day_patterns = self.coordinator.data.get("dayPatterns") day_patterns = self.coordinator.data.get("dayPatterns")
if not day_patterns: if not day_patterns:
return None return None
tomorrow_data = day_patterns.get("tomorrow")
today_data = day_patterns.get("today") if not tomorrow_data:
if not today_data:
return None return None
tomorrow_segments: list[dict] = tomorrow_data.get("segments", [])
segments: list[dict] | None = today_data.get("segments") if not tomorrow_segments:
if not segments:
return None return None
return tomorrow_segments[0].get("type")
from homeassistant.util.dt import parse_datetime # noqa: PLC0415
now = self.coordinator.time.now()
current_index: int | None = None
for i, segment in enumerate(segments):
seg_start_str: str | None = segment.get("start")
if not seg_start_str:
continue
seg_start = parse_datetime(seg_start_str)
if seg_start is not None and now >= seg_start:
current_index = i
if current_index is None or current_index + 1 >= len(segments):
return None
return segments[current_index + 1].get("type")
def _find_current_segment(self) -> tuple[int, list[dict]] | tuple[None, None]: def _find_current_segment(self) -> tuple[int, list[dict]] | tuple[None, None]:
""" """

View file

@ -1045,6 +1045,12 @@ DAY_PATTERN_SENSORS = (
state_class=None, state_class=None,
entity_registry_enabled_default=False, entity_registry_enabled_default=False,
), ),
)
# 8b. PRICE PHASE SENSORS (current/next intra-day price phase classification)
# ----------------------------------------------------------------------------
PRICE_PHASE_SENSORS = (
SensorEntityDescription( SensorEntityDescription(
key="current_price_phase", key="current_price_phase",
translation_key="current_price_phase", translation_key="current_price_phase",
@ -1065,7 +1071,7 @@ DAY_PATTERN_SENSORS = (
), ),
) )
# 8b. PRICE PHASE TIMING SENSORS (current phase duration/progress + next-phase-by-type) # 8c. PRICE PHASE TIMING SENSORS (current phase duration/progress + next-phase-by-type)
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# #
# When current phase is active: # When current phase is active:
@ -1370,6 +1376,7 @@ ENTITY_DESCRIPTIONS = (
*BEST_PRICE_TIMING_SENSORS, *BEST_PRICE_TIMING_SENSORS,
*PEAK_PRICE_TIMING_SENSORS, *PEAK_PRICE_TIMING_SENSORS,
*DAY_PATTERN_SENSORS, *DAY_PATTERN_SENSORS,
*PRICE_PHASE_SENSORS,
*PRICE_PHASE_TIMING_SENSORS, *PRICE_PHASE_TIMING_SENSORS,
*DIAGNOSTIC_SENSORS, *DIAGNOSTIC_SENSORS,
) )