hass.tibber_prices/custom_components/tibber_prices/sensor/attributes.py
Julian Pawlowski 76dc488bb5 feat(sensors): add momentum-based trend detection with two new sensors
Added intelligent price trend analysis combining historical momentum
(weighted 1h lookback) with future outlook for more accurate trend
recognition. Introduced two complementary sensors for comprehensive
trend monitoring.

New sensors:
- current_price_trend: Shows active trend direction with duration
- next_price_trend_change: Predicts when trend will reverse

Momentum analysis (historical perspective):
- Weighted 1h lookback (4 × 15-min intervals)
- Linear weight progression [0.5, 0.75, 1.0, 1.25]
- ±3% threshold for momentum classification
- Recognizes ongoing trends earlier than future-only analysis

Two-phase trend calculation:
- Phase 1: Calculate momentum from weighted trailing average
- Phase 2: Validate with volatility-adaptive future comparison
- Combines both for final trend determination (rising/falling/stable)
- Centralized in _calculate_trend_info() with 60s cache

Volatility-adaptive thresholds:
- Existing trend sensors (1h-12h) now use adaptive thresholds
- calculate_price_trend() adjusted by market volatility:
  * LOW volatility (<15% CV): factor 0.6 → more sensitive (e.g., 3%→1.8%)
  * MODERATE volatility (15-30%): factor 1.0 → baseline (3%)
  * HIGH volatility (≥30%): factor 1.4 → less sensitive (e.g., 3%→4.2%)
- Uses same coefficient of variation as volatility sensors
- Ensures mathematical consistency across integration

Default threshold reduction:
- Rising/falling thresholds: 5% → 3% (more responsive)
- Momentum-based detection enables lower thresholds without noise
- Adaptive adjustment compensates during high volatility

Architectural improvements:
- Centralized calculation: Single source of truth for both sensors
- Eliminates Henne-Ei problem (duplicate calculations)
- 60-second cache per coordinator update
- Shared helper methods: _calculate_momentum(), _combine_momentum_with_future()

Translation updates (all 5 languages):
- Documented momentum feature in custom_translations (de/en/nb/nl/sv)
- Explained "recognizes ongoing trends earlier" advantage
- Added sensor names and state options to standard translations
- Updated volatility threshold descriptions (clarify usage by trend sensors)

Files changed:
- custom_components/tibber_prices/sensor/core.py (930 lines added)
  * New: _calculate_momentum(), _combine_momentum_with_future()
  * New: _calculate_trend_info() (centralized with cache)
  * New: _get_current_trend_value(), _get_next_trend_change_value()
  * Modified: _get_price_trend_value() (volatility-adaptive thresholds)
- custom_components/tibber_prices/sensor/definitions.py
  * Added: current_price_trend (ENUM sensor)
  * Added: next_price_trend_change (TIMESTAMP sensor)
- custom_components/tibber_prices/sensor/attributes.py
  * New: _add_cached_trend_attributes() helper
  * Support for current_trend_attributes, trend_change_attributes
- custom_components/tibber_prices/price_utils.py (178 lines added)
  * New: _calculate_lookahead_volatility_factor()
  * Modified: calculate_price_trend() with volatility adjustment
  * Added: VOLATILITY_FACTOR_* constants (0.6/1.0/1.4)
- custom_components/tibber_prices/entity_utils/icons.py
  * Added: Dynamic icon handling for next_price_trend_change
- custom_components/tibber_prices/const.py
  * Changed: DEFAULT_PRICE_TREND_THRESHOLD_RISING/FALLING (5→3%)
- custom_components/tibber_prices/translations/*.json (5 files)
  * Added: Sensor names, state options, descriptions
- custom_components/tibber_prices/custom_translations/*.json (5 files)
  * Added: Long descriptions with momentum feature explanation

Impact: Users get significantly more accurate trend detection that
understands they're ALREADY in a trend, not just predicting future
changes. Momentum-based approach recognizes ongoing movements 15-60
minutes earlier. Adaptive thresholds prevent false signals during
volatile periods. Two complementary sensors enable both status display
(current trend) and event-based automation (when will it change).
Perfect for use cases like "charge EV when next trend change shows
falling prices" or dashboard badges showing "Rising for 2.5h".
2025-11-16 12:49:43 +00:00

967 lines
34 KiB
Python

"""
Attribute builders for Tibber Prices sensors.
This module contains all the attribute building logic extracted from TibberPricesSensor.
Each function takes explicit parameters instead of accessing instance variables.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
from custom_components.tibber_prices.const import (
PRICE_LEVEL_MAPPING,
PRICE_RATING_MAPPING,
)
from custom_components.tibber_prices.entity_utils import add_icon_color_attribute
from custom_components.tibber_prices.price_utils import (
MINUTES_PER_INTERVAL,
calculate_volatility_level,
find_price_data_for_interval,
)
from homeassistant.const import PERCENTAGE
from homeassistant.util import dt as dt_util
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator import (
TibberPricesDataUpdateCoordinator,
)
# Constants
MAX_FORECAST_INTERVALS = 8 # Show up to 8 future intervals (2 hours with 15-min intervals)
def _is_timing_or_volatility_sensor(key: str) -> bool:
"""Check if sensor is a timing or volatility sensor."""
return key.endswith("_volatility") or (
key.startswith(("best_price_", "peak_price_"))
and any(
suffix in key
for suffix in [
"end_time",
"remaining_minutes",
"progress",
"next_start_time",
"next_in_minutes",
]
)
)
def _add_timing_or_volatility_attributes(
attributes: dict,
key: str,
cached_data: dict,
native_value: Any = None,
) -> None:
"""Add attributes for timing or volatility sensors."""
if key.endswith("_volatility"):
add_volatility_attributes(attributes=attributes, cached_data=cached_data)
else:
add_period_timing_attributes(attributes=attributes, key=key, state_value=native_value)
def _add_cached_trend_attributes(attributes: dict, key: str, cached_data: dict) -> None:
"""Add cached trend attributes if available."""
if key.startswith("price_trend_") and cached_data.get("trend_attributes"):
attributes.update(cached_data["trend_attributes"])
elif key == "current_price_trend" and cached_data.get("current_trend_attributes"):
attributes.update(cached_data["current_trend_attributes"])
elif key == "next_price_trend_change" and cached_data.get("trend_change_attributes"):
attributes.update(cached_data["trend_change_attributes"])
def build_sensor_attributes(
key: str,
coordinator: TibberPricesDataUpdateCoordinator,
native_value: Any,
cached_data: dict,
) -> dict | None:
"""
Build attributes for a sensor based on its key.
Args:
key: The sensor entity key
coordinator: The data update coordinator
native_value: The current native value of the sensor
cached_data: Dictionary containing cached sensor data
(_last_extreme_interval, _trend_attributes, _volatility_attributes, etc.)
Returns:
Dictionary of attributes or None if no attributes should be added
"""
if not coordinator.data:
return None
try:
attributes: dict[str, Any] = {}
# For trend sensors, use cached attributes
_add_cached_trend_attributes(attributes, key, cached_data)
# Group sensors by type and delegate to specific handlers
if key in [
"current_interval_price",
"current_interval_price_level",
"next_interval_price",
"previous_interval_price",
"current_hour_average_price",
"next_hour_average_price",
"next_interval_price_level",
"previous_interval_price_level",
"current_hour_price_level",
"next_hour_price_level",
"next_interval_price_rating",
"previous_interval_price_rating",
"current_hour_price_rating",
"next_hour_price_rating",
]:
add_current_interval_price_attributes(
attributes=attributes,
key=key,
coordinator=coordinator,
native_value=native_value,
cached_data=cached_data,
)
elif key in [
"trailing_price_average",
"leading_price_average",
"trailing_price_min",
"trailing_price_max",
"leading_price_min",
"leading_price_max",
]:
add_average_price_attributes(attributes=attributes, key=key, coordinator=coordinator)
elif key.startswith("next_avg_"):
add_next_avg_attributes(attributes=attributes, key=key, coordinator=coordinator)
elif any(
pattern in key
for pattern in [
"_price_today",
"_price_tomorrow",
"_price_yesterday",
"yesterday_price_level",
"today_price_level",
"tomorrow_price_level",
"yesterday_price_rating",
"today_price_rating",
"tomorrow_price_rating",
"rating",
"data_timestamp",
]
):
add_statistics_attributes(
attributes=attributes,
key=key,
coordinator=coordinator,
cached_data=cached_data,
)
elif key == "price_forecast":
add_price_forecast_attributes(attributes=attributes, coordinator=coordinator)
elif _is_timing_or_volatility_sensor(key):
_add_timing_or_volatility_attributes(attributes, key, cached_data, native_value)
# For current_interval_price_level, add the original level as attribute
if key == "current_interval_price_level" and cached_data.get("last_price_level") is not None:
attributes["level_id"] = cached_data["last_price_level"]
# Add icon_color for daily level and rating sensors (uses native_value)
if key in [
"yesterday_price_level",
"today_price_level",
"tomorrow_price_level",
"yesterday_price_rating",
"today_price_rating",
"tomorrow_price_rating",
]:
add_icon_color_attribute(attributes, key=key, state_value=native_value)
except (KeyError, ValueError, TypeError) as ex:
coordinator.logger.exception(
"Error getting sensor attributes",
extra={
"error": str(ex),
"entity": key,
},
)
return None
else:
return attributes if attributes else None
def add_current_interval_price_attributes(
attributes: dict,
key: str,
coordinator: TibberPricesDataUpdateCoordinator,
native_value: Any,
cached_data: dict,
) -> None:
"""
Add attributes for current interval price sensors.
Args:
attributes: Dictionary to add attributes to
key: The sensor entity key
coordinator: The data update coordinator
native_value: The current native value of the sensor
cached_data: Dictionary containing cached sensor data
"""
price_info = coordinator.data.get("priceInfo", {}) if coordinator.data else {}
now = dt_util.now()
# Determine which interval to use based on sensor type
next_interval_sensors = [
"next_interval_price",
"next_interval_price_level",
"next_interval_price_rating",
]
previous_interval_sensors = [
"previous_interval_price",
"previous_interval_price_level",
"previous_interval_price_rating",
]
next_hour_sensors = [
"next_hour_average_price",
"next_hour_price_level",
"next_hour_price_rating",
]
current_hour_sensors = [
"current_hour_average_price",
"current_hour_price_level",
"current_hour_price_rating",
]
# Set timestamp and interval data based on sensor type
interval_data = None
if key in next_interval_sensors:
target_time = now + timedelta(minutes=MINUTES_PER_INTERVAL)
interval_data = find_price_data_for_interval(price_info, target_time)
attributes["timestamp"] = interval_data["startsAt"] if interval_data else None
elif key in previous_interval_sensors:
target_time = now - timedelta(minutes=MINUTES_PER_INTERVAL)
interval_data = find_price_data_for_interval(price_info, target_time)
attributes["timestamp"] = interval_data["startsAt"] if interval_data else None
elif key in next_hour_sensors:
target_time = now + timedelta(hours=1)
interval_data = find_price_data_for_interval(price_info, target_time)
attributes["timestamp"] = interval_data["startsAt"] if interval_data else None
elif key in current_hour_sensors:
current_interval_data = get_current_interval_data(coordinator)
attributes["timestamp"] = current_interval_data["startsAt"] if current_interval_data else None
else:
current_interval_data = get_current_interval_data(coordinator)
interval_data = current_interval_data # Use current_interval_data as interval_data for current_interval_price
attributes["timestamp"] = current_interval_data["startsAt"] if current_interval_data else None
# Add icon_color for price sensors (based on their price level)
if key in ["current_interval_price", "next_interval_price", "previous_interval_price"]:
# For interval-based price sensors, get level from interval_data
if interval_data and "level" in interval_data:
level = interval_data["level"]
add_icon_color_attribute(attributes, key="price_level", state_value=level)
elif key in ["current_hour_average_price", "next_hour_average_price"]:
# For hour-based price sensors, get level from cached_data
level = cached_data.get("rolling_hour_level")
if level:
add_icon_color_attribute(attributes, key="price_level", state_value=level)
# Add price level attributes for all level sensors
add_level_attributes_for_sensor(
attributes=attributes,
key=key,
interval_data=interval_data,
coordinator=coordinator,
native_value=native_value,
)
# Add price rating attributes for all rating sensors
add_rating_attributes_for_sensor(
attributes=attributes,
key=key,
interval_data=interval_data,
coordinator=coordinator,
native_value=native_value,
)
def add_level_attributes_for_sensor(
attributes: dict,
key: str,
interval_data: dict | None,
coordinator: TibberPricesDataUpdateCoordinator,
native_value: Any,
) -> None:
"""
Add price level attributes based on sensor type.
Args:
attributes: Dictionary to add attributes to
key: The sensor entity key
interval_data: Interval data for next/previous sensors
coordinator: The data update coordinator
native_value: The current native value of the sensor
"""
# For interval-based level sensors (next/previous), use interval data
if key in ["next_interval_price_level", "previous_interval_price_level"]:
if interval_data and "level" in interval_data:
add_price_level_attributes(attributes, interval_data["level"])
# For hour-aggregated level sensors, use native_value
elif key in ["current_hour_price_level", "next_hour_price_level"]:
level_value = native_value
if level_value and isinstance(level_value, str):
add_price_level_attributes(attributes, level_value.upper())
# For current price level sensor
elif key == "current_interval_price_level":
current_interval_data = get_current_interval_data(coordinator)
if current_interval_data and "level" in current_interval_data:
add_price_level_attributes(attributes, current_interval_data["level"])
def add_price_level_attributes(attributes: dict, level: str) -> None:
"""
Add price level specific attributes.
Args:
attributes: Dictionary to add attributes to
level: The price level value (e.g., VERY_CHEAP, NORMAL, etc.)
"""
if level in PRICE_LEVEL_MAPPING:
attributes["level_value"] = PRICE_LEVEL_MAPPING[level]
attributes["level_id"] = level
# Add icon_color for dynamic styling
add_icon_color_attribute(attributes, key="price_level", state_value=level)
def add_rating_attributes_for_sensor(
attributes: dict,
key: str,
interval_data: dict | None,
coordinator: TibberPricesDataUpdateCoordinator,
native_value: Any,
) -> None:
"""
Add price rating attributes based on sensor type.
Args:
attributes: Dictionary to add attributes to
key: The sensor entity key
interval_data: Interval data for next/previous sensors
coordinator: The data update coordinator
native_value: The current native value of the sensor
"""
# For interval-based rating sensors (next/previous), use interval data
if key in ["next_interval_price_rating", "previous_interval_price_rating"]:
if interval_data and "rating_level" in interval_data:
add_price_rating_attributes(attributes, interval_data["rating_level"])
# For hour-aggregated rating sensors, use native_value
elif key in ["current_hour_price_rating", "next_hour_price_rating"]:
rating_value = native_value
if rating_value and isinstance(rating_value, str):
add_price_rating_attributes(attributes, rating_value.upper())
# For current price rating sensor
elif key == "current_interval_price_rating":
current_interval_data = get_current_interval_data(coordinator)
if current_interval_data and "rating_level" in current_interval_data:
add_price_rating_attributes(attributes, current_interval_data["rating_level"])
def add_price_rating_attributes(attributes: dict, rating: str) -> None:
"""
Add price rating specific attributes.
Args:
attributes: Dictionary to add attributes to
rating: The price rating value (e.g., LOW, NORMAL, HIGH)
"""
if rating in PRICE_RATING_MAPPING:
attributes["rating_value"] = PRICE_RATING_MAPPING[rating]
attributes["rating_id"] = rating
# Add icon_color for dynamic styling
add_icon_color_attribute(attributes, key="price_rating", state_value=rating)
def add_statistics_attributes(
attributes: dict,
key: str,
coordinator: TibberPricesDataUpdateCoordinator,
cached_data: dict,
) -> None:
"""
Add attributes for statistics and rating sensors.
Args:
attributes: Dictionary to add attributes to
key: The sensor entity key
coordinator: The data update coordinator
cached_data: Dictionary containing cached sensor data
"""
price_info = coordinator.data.get("priceInfo", {})
now = dt_util.now()
if key == "data_timestamp":
# For data_timestamp sensor, use the latest timestamp from cached_data
latest_timestamp = cached_data.get("data_timestamp")
if latest_timestamp:
attributes["timestamp"] = latest_timestamp.isoformat()
elif key == "current_interval_price_rating":
interval_data = find_price_data_for_interval(price_info, now)
attributes["timestamp"] = interval_data["startsAt"] if interval_data else None
if cached_data.get("last_rating_difference") is not None:
attributes["diff_" + PERCENTAGE] = cached_data["last_rating_difference"]
if cached_data.get("last_rating_level") is not None:
attributes["level_id"] = cached_data["last_rating_level"]
attributes["level_value"] = PRICE_RATING_MAPPING.get(
cached_data["last_rating_level"], cached_data["last_rating_level"]
)
elif key in [
"lowest_price_today",
"highest_price_today",
"lowest_price_tomorrow",
"highest_price_tomorrow",
]:
# Use the timestamp from the interval that has the extreme price
if cached_data.get("last_extreme_interval"):
attributes["timestamp"] = cached_data["last_extreme_interval"].get("startsAt")
else:
# Fallback: use the first timestamp of the appropriate day
_add_fallback_timestamp(attributes, key, price_info)
elif key in [
"yesterday_price_level",
"today_price_level",
"tomorrow_price_level",
"yesterday_price_rating",
"today_price_rating",
"tomorrow_price_rating",
]:
# Daily aggregated level/rating sensors - add timestamp
day_key = _get_day_key_from_sensor_key(key)
day_data = price_info.get(day_key, [])
if day_data:
# Use first timestamp of the day (00:00)
attributes["timestamp"] = day_data[0].get("startsAt")
else:
# Fallback: use the first timestamp of the appropriate day
_add_fallback_timestamp(attributes, key, price_info)
def _get_day_key_from_sensor_key(key: str) -> str:
"""
Extract day key (yesterday/today/tomorrow) from sensor key.
Args:
key: The sensor entity key
Returns:
Day key: "yesterday", "today", or "tomorrow"
"""
if "yesterday" in key:
return "yesterday"
if "tomorrow" in key:
return "tomorrow"
return "today"
def _add_fallback_timestamp(attributes: dict, key: str, price_info: dict) -> None:
"""
Add fallback timestamp to attributes based on the day in the sensor key.
Args:
attributes: Dictionary to add timestamp to
key: The sensor entity key
price_info: Price info dictionary from coordinator data
"""
day_key = _get_day_key_from_sensor_key(key)
day_data = price_info.get(day_key, [])
if day_data:
attributes["timestamp"] = day_data[0].get("startsAt")
def add_average_price_attributes(
attributes: dict,
key: str,
coordinator: TibberPricesDataUpdateCoordinator,
) -> None:
"""
Add attributes for trailing and leading average/min/max price sensors.
Args:
attributes: Dictionary to add attributes to
key: The sensor entity key
coordinator: The data update coordinator
"""
now = dt_util.now()
# Determine if this is trailing or leading
is_trailing = "trailing" in key
# Get all price intervals
price_info = coordinator.data.get("priceInfo", {})
yesterday_prices = price_info.get("yesterday", [])
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = yesterday_prices + today_prices + tomorrow_prices
if not all_prices:
return
# Calculate the time window
if is_trailing:
window_start = now - timedelta(hours=24)
window_end = now
else:
window_start = now
window_end = now + timedelta(hours=24)
# Find all intervals in the window
intervals_in_window = []
extreme_interval = None # Track interval with min/max for min/max sensors
is_min_max_sensor = "min" in key or "max" in key
for price_data in all_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
if window_start <= starts_at < window_end:
intervals_in_window.append(price_data)
# Track extreme interval for min/max sensors
if is_min_max_sensor:
extreme_interval = _update_extreme_interval(extreme_interval, price_data, key)
# Add timestamp attribute
if intervals_in_window:
# For min/max sensors: use the timestamp of the interval with extreme price
# For average sensors: use first interval in the window
if extreme_interval and is_min_max_sensor:
attributes["timestamp"] = extreme_interval.get("startsAt")
else:
attributes["timestamp"] = intervals_in_window[0].get("startsAt")
attributes["interval_count"] = len(intervals_in_window)
def _update_extreme_interval(extreme_interval: dict | None, price_data: dict, key: str) -> dict:
"""
Update extreme interval for min/max sensors.
Args:
extreme_interval: Current extreme interval or None
price_data: New price data to compare
key: Sensor key to determine if min or max
Returns:
Updated extreme interval
"""
if extreme_interval is None:
return price_data
price = price_data.get("total")
extreme_price = extreme_interval.get("total")
if price is None or extreme_price is None:
return extreme_interval
is_new_extreme = ("min" in key and price < extreme_price) or ("max" in key and price > extreme_price)
return price_data if is_new_extreme else extreme_interval
def add_next_avg_attributes(
attributes: dict,
key: str,
coordinator: TibberPricesDataUpdateCoordinator,
) -> None:
"""
Add attributes for next N hours average price sensors.
Args:
attributes: Dictionary to add attributes to
key: The sensor entity key
coordinator: The data update coordinator
"""
now = dt_util.now()
# Extract hours from sensor key (e.g., "next_avg_3h" -> 3)
try:
hours = int(key.replace("next_avg_", "").replace("h", ""))
except (ValueError, AttributeError):
return
# Get next interval start time (this is where the calculation begins)
next_interval_start = now + timedelta(minutes=MINUTES_PER_INTERVAL)
# Calculate the end of the time window
window_end = next_interval_start + timedelta(hours=hours)
# Get all price intervals
price_info = coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = today_prices + tomorrow_prices
if not all_prices:
return
# Find all intervals in the window
intervals_in_window = []
for price_data in all_prices:
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
if next_interval_start <= starts_at < window_end:
intervals_in_window.append(price_data)
# Add timestamp attribute (start of next interval - where calculation begins)
if intervals_in_window:
attributes["timestamp"] = intervals_in_window[0].get("startsAt")
attributes["interval_count"] = len(intervals_in_window)
attributes["hours"] = hours
def add_price_forecast_attributes(
attributes: dict,
coordinator: TibberPricesDataUpdateCoordinator,
) -> None:
"""
Add forecast attributes for the price forecast sensor.
Args:
attributes: Dictionary to add attributes to
coordinator: The data update coordinator
"""
future_prices = get_future_prices(coordinator, max_intervals=MAX_FORECAST_INTERVALS)
if not future_prices:
attributes["intervals"] = []
attributes["intervals_by_hour"] = []
attributes["data_available"] = False
return
# Add timestamp attribute (first future interval)
if future_prices:
attributes["timestamp"] = future_prices[0]["interval_start"]
attributes["intervals"] = future_prices
attributes["data_available"] = True
# Group by hour for easier consumption in dashboards
hours: dict[str, Any] = {}
for interval in future_prices:
starts_at = datetime.fromisoformat(interval["interval_start"])
hour_key = starts_at.strftime("%Y-%m-%d %H")
if hour_key not in hours:
hours[hour_key] = {
"hour": starts_at.hour,
"day": interval["day"],
"date": starts_at.date().isoformat(),
"intervals": [],
"min_price": None,
"max_price": None,
"avg_price": 0,
"avg_rating": None, # Initialize rating tracking
"ratings_available": False, # Track if any ratings are available
}
# Create interval data with both price and rating info
interval_data = {
"minute": starts_at.minute,
"price": interval["price"],
"price_minor": interval["price_minor"],
"level": interval["level"], # Price level from priceInfo
"time": starts_at.strftime("%H:%M"),
}
# Add rating data if available
if interval["rating"] is not None:
interval_data["rating"] = interval["rating"]
interval_data["rating_level"] = interval["rating_level"]
hours[hour_key]["ratings_available"] = True
hours[hour_key]["intervals"].append(interval_data)
# Track min/max/avg for the hour
price = interval["price"]
if hours[hour_key]["min_price"] is None or price < hours[hour_key]["min_price"]:
hours[hour_key]["min_price"] = price
if hours[hour_key]["max_price"] is None or price > hours[hour_key]["max_price"]:
hours[hour_key]["max_price"] = price
# Calculate averages
for hour_data in hours.values():
prices = [interval["price"] for interval in hour_data["intervals"]]
if prices:
hour_data["avg_price"] = sum(prices) / len(prices)
hour_data["min_price"] = hour_data["min_price"]
hour_data["max_price"] = hour_data["max_price"]
# Calculate average rating if ratings are available
if hour_data["ratings_available"]:
ratings = [interval.get("rating") for interval in hour_data["intervals"] if "rating" in interval]
if ratings:
hour_data["avg_rating"] = sum(ratings) / len(ratings)
# Convert to list sorted by hour
attributes["intervals_by_hour"] = [hour_data for _, hour_data in sorted(hours.items())]
def add_volatility_attributes(
attributes: dict,
cached_data: dict,
) -> None:
"""
Add attributes for volatility sensors.
Args:
attributes: Dictionary to add attributes to
cached_data: Dictionary containing cached sensor data
"""
if cached_data.get("volatility_attributes"):
attributes.update(cached_data["volatility_attributes"])
def get_prices_for_volatility(
volatility_type: str,
price_info: dict,
) -> list[float]:
"""
Get price list for volatility calculation based on type.
Args:
volatility_type: One of "today", "tomorrow", "next_24h", "today_tomorrow"
price_info: Price information dictionary from coordinator data
Returns:
List of prices to analyze
"""
if volatility_type == "today":
return [float(p["total"]) for p in price_info.get("today", []) if "total" in p]
if volatility_type == "tomorrow":
return [float(p["total"]) for p in price_info.get("tomorrow", []) if "total" in p]
if volatility_type == "next_24h":
# Rolling 24h from now
now = dt_util.now()
end_time = now + timedelta(hours=24)
prices = []
for day_key in ["today", "tomorrow"]:
for price_data in price_info.get(day_key, []):
starts_at = dt_util.parse_datetime(price_data.get("startsAt"))
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
if now <= starts_at < end_time and "total" in price_data:
prices.append(float(price_data["total"]))
return prices
if volatility_type == "today_tomorrow":
# Combined today + tomorrow
prices = []
for day_key in ["today", "tomorrow"]:
for price_data in price_info.get(day_key, []):
if "total" in price_data:
prices.append(float(price_data["total"]))
return prices
return []
def add_volatility_type_attributes(
volatility_attributes: dict,
volatility_type: str,
price_info: dict,
thresholds: dict,
) -> None:
"""
Add type-specific attributes for volatility sensors.
Args:
volatility_attributes: Dictionary to add type-specific attributes to
volatility_type: Type of volatility calculation
price_info: Price information dictionary from coordinator data
thresholds: Volatility thresholds configuration
"""
# Add timestamp for calendar day volatility sensors (midnight of the day)
if volatility_type == "today":
today_data = price_info.get("today", [])
if today_data:
volatility_attributes["timestamp"] = today_data[0].get("startsAt")
elif volatility_type == "tomorrow":
tomorrow_data = price_info.get("tomorrow", [])
if tomorrow_data:
volatility_attributes["timestamp"] = tomorrow_data[0].get("startsAt")
elif volatility_type == "today_tomorrow":
# For combined today+tomorrow, use today's midnight
today_data = price_info.get("today", [])
if today_data:
volatility_attributes["timestamp"] = today_data[0].get("startsAt")
# Add breakdown for today vs tomorrow
today_prices = [float(p["total"]) for p in price_info.get("today", []) if "total" in p]
tomorrow_prices = [float(p["total"]) for p in price_info.get("tomorrow", []) if "total" in p]
if today_prices:
today_vol = calculate_volatility_level(today_prices, **thresholds)
today_spread = (max(today_prices) - min(today_prices)) * 100
volatility_attributes["today_spread"] = round(today_spread, 2)
volatility_attributes["today_volatility"] = today_vol
volatility_attributes["interval_count_today"] = len(today_prices)
if tomorrow_prices:
tomorrow_vol = calculate_volatility_level(tomorrow_prices, **thresholds)
tomorrow_spread = (max(tomorrow_prices) - min(tomorrow_prices)) * 100
volatility_attributes["tomorrow_spread"] = round(tomorrow_spread, 2)
volatility_attributes["tomorrow_volatility"] = tomorrow_vol
volatility_attributes["interval_count_tomorrow"] = len(tomorrow_prices)
elif volatility_type == "next_24h":
# Add time window info
now = dt_util.now()
volatility_attributes["timestamp"] = now.isoformat()
def get_future_prices(
coordinator: TibberPricesDataUpdateCoordinator,
max_intervals: int | None = None,
) -> list[dict] | None:
"""
Get future price data for multiple upcoming intervals.
Args:
coordinator: The data update coordinator
max_intervals: Maximum number of future intervals to return
Returns:
List of upcoming price intervals with timestamps and prices
"""
if not coordinator.data:
return None
price_info = coordinator.data.get("priceInfo", {})
today_prices = price_info.get("today", [])
tomorrow_prices = price_info.get("tomorrow", [])
all_prices = today_prices + tomorrow_prices
if not all_prices:
return None
now = dt_util.now()
# Initialize the result list
future_prices = []
# Track the maximum intervals to return
intervals_to_return = MAX_FORECAST_INTERVALS if max_intervals is None else max_intervals
for day_key in ["today", "tomorrow"]:
for price_data in price_info.get(day_key, []):
starts_at = dt_util.parse_datetime(price_data["startsAt"])
if starts_at is None:
continue
starts_at = dt_util.as_local(starts_at)
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
if starts_at > now:
future_prices.append(
{
"interval_start": starts_at.isoformat(),
"interval_end": interval_end.isoformat(),
"price": float(price_data["total"]),
"price_minor": round(float(price_data["total"]) * 100, 2),
"level": price_data.get("level", "NORMAL"),
"rating": price_data.get("difference", None),
"rating_level": price_data.get("rating_level"),
"day": day_key,
}
)
# Sort by start time
future_prices.sort(key=lambda x: x["interval_start"])
# Limit to the requested number of intervals
return future_prices[:intervals_to_return] if future_prices else None
def get_current_interval_data(
coordinator: TibberPricesDataUpdateCoordinator,
) -> dict | None:
"""
Get the current interval data from coordinator.
Args:
coordinator: The data update coordinator
Returns:
Current interval data dictionary or None
"""
if not coordinator.data:
return None
price_info = coordinator.data.get("priceInfo", {})
now = dt_util.now()
return find_price_data_for_interval(price_info, now)
def add_period_timing_attributes(
attributes: dict,
key: str,
state_value: Any = None,
) -> None:
"""
Add timestamp and icon_color attributes for best_price/peak_price timing sensors.
The timestamp indicates when the sensor value was calculated:
- Quarter-hour sensors (end_time, next_start_time): Timestamp of current 15-min interval
- Minute-update sensors (remaining_minutes, progress, next_in_minutes): Current minute with :00 seconds
Args:
attributes: Dictionary to add attributes to
key: The sensor entity key (e.g., "best_price_end_time")
state_value: Current sensor value for icon_color calculation
"""
# Determine if this is a quarter-hour or minute-update sensor
is_quarter_hour_sensor = key.endswith(("_end_time", "_next_start_time"))
now = dt_util.now()
if is_quarter_hour_sensor:
# Quarter-hour sensors: Use timestamp of current 15-minute interval
# Round down to the nearest quarter hour (:00, :15, :30, :45)
minute = (now.minute // 15) * 15
timestamp = now.replace(minute=minute, second=0, microsecond=0)
else:
# Minute-update sensors: Use current minute with :00 seconds
# This ensures clean timestamps despite timer fluctuations
timestamp = now.replace(second=0, microsecond=0)
attributes["timestamp"] = timestamp.isoformat()
# Add icon_color for dynamic styling
add_icon_color_attribute(attributes, key=key, state_value=state_value)