mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-29 21:03:40 +00:00
Moved filter logic and all period attribute calculations from binary_sensor.py
to coordinator.py and period_utils.py, following Home Assistant best practices
for data flow architecture.
ARCHITECTURE CHANGES:
Binary Sensor Simplification (~225 lines removed):
- Removed _build_periods_summary, _add_price_diff_for_period (calculation logic)
- Removed _get_period_intervals_from_price_info (107 lines, interval reconstruction)
- Removed _should_show_periods, _check_volatility_filter, _check_level_filter
- Removed _build_empty_periods_result (filtering result builder)
- Removed _get_price_hours_attributes (24 lines, dead code)
- Removed datetime import (unused after cleanup)
- New: _build_final_attributes_simple (~20 lines, timestamp-only)
- Result: Pure display-only logic, reads pre-calculated data from coordinator
Coordinator Enhancement (+160 lines):
- Added _should_show_periods(): UND-Verknüpfung of volatility and level filters
- Added _check_volatility_filter(): Checks min_volatility threshold
- Added _check_level_filter(): Checks min/max level bounds
- Enhanced _calculate_periods_for_price_info(): Applies filters before period calculation
- Returns empty periods when filters don't match (instead of calculating unnecessarily)
- Passes volatility thresholds (moderate/high/very_high) to PeriodConfig
Period Utils Refactoring (+110 lines):
- Extended PeriodConfig with threshold_volatility_moderate/high/very_high
- Added PeriodData NamedTuple: Groups timing data (start, end, length, position)
- Added PeriodStatistics NamedTuple: Groups calculated stats (prices, volatility, ratings)
- Added ThresholdConfig NamedTuple: Groups all thresholds + reverse_sort flag
- New _calculate_period_price_statistics(): Extracts price_avg/min/max/spread calculation
- New _build_period_summary_dict(): Builds final dict with correct attribute ordering
- Enhanced _extract_period_summaries(): Now calculates ALL attributes (no longer lightweight):
* price_avg, price_min, price_max, price_spread (in minor units: ct/øre)
* volatility (low/moderate/high/very_high based on absolute thresholds)
* rating_difference_% (average of interval differences)
* period_price_diff_from_daily_min/max (period avg vs daily reference)
* aggregated level and rating_level
* period_interval_count (renamed from interval_count for clarity)
- Removed interval_starts array (redundant - start/end/count sufficient)
- Function signature refactored from 9→4 parameters using NamedTuples
Code Organization (HA Best Practice):
- Moved calculate_volatility_level() from const.py to price_utils.py
- Rule: const.py should contain only constants, no functions
- Removed duplicate VOLATILITY_THRESHOLD_* constants from const.py
- Updated imports in sensor.py, services.py, period_utils.py
DATA FLOW:
Before:
API → Coordinator (basic enrichment) → Binary Sensor (calculate everything on each access)
After:
API → Coordinator (enrichment + filtering + period calculation with ALL attributes) →
Cached Data → Binary Sensor (display + timestamp only)
ATTRIBUTE STRUCTURE:
Period summaries now contain (following copilot-instructions.md ordering):
1. Time: start, end, duration_minutes
2. Decision: level, rating_level, rating_difference_%
3. Prices: price_avg, price_min, price_max, price_spread, volatility
4. Differences: period_price_diff_from_daily_min/max (conditional)
5. Details: period_interval_count, period_position
6. Meta: periods_total, periods_remaining
BREAKING CHANGES: None
- Period data structure enhanced but backwards compatible
- Binary sensor API unchanged (state + attributes)
Impact: Binary sensors now display pre-calculated data from coordinator instead
of calculating on every access. Reduces complexity, improves performance, and
centralizes business logic following Home Assistant coordinator pattern. All
period filtering (volatility + level) now happens in coordinator before caching.
503 lines
16 KiB
Python
503 lines
16 KiB
Python
"""Utility functions for price data calculations."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Any
|
|
|
|
from homeassistant.util import dt as dt_util
|
|
|
|
from .const import (
|
|
DEFAULT_VOLATILITY_THRESHOLD_HIGH,
|
|
DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
|
|
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
|
|
PRICE_LEVEL_MAPPING,
|
|
PRICE_LEVEL_NORMAL,
|
|
PRICE_RATING_NORMAL,
|
|
VOLATILITY_HIGH,
|
|
VOLATILITY_LOW,
|
|
VOLATILITY_MODERATE,
|
|
VOLATILITY_VERY_HIGH,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
MINUTES_PER_INTERVAL = 15
|
|
|
|
|
|
def calculate_volatility_level(
|
|
spread: float,
|
|
threshold_moderate: float | None = None,
|
|
threshold_high: float | None = None,
|
|
threshold_very_high: float | None = None,
|
|
) -> str:
|
|
"""
|
|
Calculate volatility level from price spread.
|
|
|
|
Volatility indicates how much prices fluctuate during a period, which helps
|
|
determine whether active load shifting is worthwhile.
|
|
|
|
Args:
|
|
spread: Absolute price difference between max and min (in minor currency units, e.g., ct or øre)
|
|
threshold_moderate: Custom threshold for MODERATE level (default: use DEFAULT_VOLATILITY_THRESHOLD_MODERATE)
|
|
threshold_high: Custom threshold for HIGH level (default: use DEFAULT_VOLATILITY_THRESHOLD_HIGH)
|
|
threshold_very_high: Custom threshold for VERY_HIGH level (default: use DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH)
|
|
|
|
Returns:
|
|
Volatility level: "LOW", "MODERATE", "HIGH", or "VERY_HIGH" (uppercase)
|
|
|
|
Examples:
|
|
- spread < 5: LOW → minimal optimization potential
|
|
- 5 ≤ spread < 15: MODERATE → some optimization worthwhile
|
|
- 15 ≤ spread < 30: HIGH → strong optimization recommended
|
|
- spread ≥ 30: VERY_HIGH → maximum optimization potential
|
|
|
|
"""
|
|
# Use provided thresholds or fall back to constants
|
|
t_moderate = threshold_moderate if threshold_moderate is not None else DEFAULT_VOLATILITY_THRESHOLD_MODERATE
|
|
t_high = threshold_high if threshold_high is not None else DEFAULT_VOLATILITY_THRESHOLD_HIGH
|
|
t_very_high = threshold_very_high if threshold_very_high is not None else DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH
|
|
|
|
if spread < t_moderate:
|
|
return VOLATILITY_LOW
|
|
if spread < t_high:
|
|
return VOLATILITY_MODERATE
|
|
if spread < t_very_high:
|
|
return VOLATILITY_HIGH
|
|
return VOLATILITY_VERY_HIGH
|
|
|
|
|
|
def calculate_trailing_average_for_interval(
|
|
interval_start: datetime,
|
|
all_prices: list[dict[str, Any]],
|
|
) -> float | None:
|
|
"""
|
|
Calculate the trailing 24-hour average price for a specific interval.
|
|
|
|
Args:
|
|
interval_start: The start time of the interval we're calculating for
|
|
all_prices: List of all available price intervals (yesterday + today + tomorrow)
|
|
|
|
Returns:
|
|
The average price of all intervals in the 24 hours before interval_start,
|
|
or None if insufficient data is available.
|
|
|
|
"""
|
|
if not all_prices:
|
|
return None
|
|
|
|
# Calculate the lookback period: 24 hours before this interval
|
|
lookback_start = interval_start - timedelta(hours=24)
|
|
|
|
# Collect all prices that fall within the 24-hour lookback window
|
|
matching_prices = []
|
|
|
|
for price_data in all_prices:
|
|
starts_at_str = price_data.get("startsAt")
|
|
if not starts_at_str:
|
|
continue
|
|
|
|
# Parse the timestamp
|
|
price_time = dt_util.parse_datetime(starts_at_str)
|
|
if price_time is None:
|
|
continue
|
|
|
|
# Convert to local timezone for comparison
|
|
price_time = dt_util.as_local(price_time)
|
|
|
|
# Check if this price falls within our lookback window
|
|
# Include prices that start >= lookback_start and start < interval_start
|
|
if lookback_start <= price_time < interval_start:
|
|
total_price = price_data.get("total")
|
|
if total_price is not None:
|
|
matching_prices.append(float(total_price))
|
|
|
|
if not matching_prices:
|
|
_LOGGER.debug(
|
|
"No prices found in 24-hour lookback window for interval starting at %s (lookback: %s to %s)",
|
|
interval_start,
|
|
lookback_start,
|
|
interval_start,
|
|
)
|
|
return None
|
|
|
|
# Calculate and return the average
|
|
return sum(matching_prices) / len(matching_prices)
|
|
|
|
|
|
def calculate_difference_percentage(
|
|
current_price: float,
|
|
trailing_average: float | None,
|
|
) -> float | None:
|
|
"""
|
|
Calculate the difference percentage between current price and trailing average.
|
|
|
|
This mimics the API's "difference" field from priceRating endpoint.
|
|
|
|
Args:
|
|
current_price: The current interval's price
|
|
trailing_average: The 24-hour trailing average price
|
|
|
|
Returns:
|
|
The percentage difference: ((current - average) / average) * 100
|
|
or None if trailing_average is None or zero.
|
|
|
|
"""
|
|
if trailing_average is None or trailing_average == 0:
|
|
return None
|
|
|
|
return ((current_price - trailing_average) / trailing_average) * 100
|
|
|
|
|
|
def calculate_rating_level(
|
|
difference: float | None,
|
|
threshold_low: float,
|
|
threshold_high: float,
|
|
) -> str | None:
|
|
"""
|
|
Calculate the rating level based on difference percentage and thresholds.
|
|
|
|
This mimics the API's "level" field from priceRating endpoint.
|
|
|
|
Args:
|
|
difference: The difference percentage (from calculate_difference_percentage)
|
|
threshold_low: The low threshold percentage (typically -100 to 0)
|
|
threshold_high: The high threshold percentage (typically 0 to 100)
|
|
|
|
Returns:
|
|
"LOW" if difference <= threshold_low
|
|
"HIGH" if difference >= threshold_high
|
|
"NORMAL" otherwise
|
|
None if difference is None
|
|
|
|
"""
|
|
if difference is None:
|
|
return None
|
|
|
|
# If difference falls in both ranges (shouldn't normally happen), return NORMAL
|
|
if difference <= threshold_low and difference >= threshold_high:
|
|
return PRICE_RATING_NORMAL
|
|
|
|
# Classify based on thresholds
|
|
if difference <= threshold_low:
|
|
return "LOW"
|
|
|
|
if difference >= threshold_high:
|
|
return "HIGH"
|
|
|
|
return PRICE_RATING_NORMAL
|
|
|
|
|
|
def _process_price_interval(
|
|
price_interval: dict[str, Any],
|
|
all_prices: list[dict[str, Any]],
|
|
threshold_low: float,
|
|
threshold_high: float,
|
|
day_label: str,
|
|
) -> None:
|
|
"""
|
|
Process a single price interval and add difference and rating_level.
|
|
|
|
Args:
|
|
price_interval: The price interval to process (modified in place)
|
|
all_prices: All available price intervals for lookback calculation
|
|
threshold_low: Low threshold percentage
|
|
threshold_high: High threshold percentage
|
|
day_label: Label for logging ("today" or "tomorrow")
|
|
|
|
"""
|
|
starts_at_str = price_interval.get("startsAt")
|
|
if not starts_at_str:
|
|
return
|
|
|
|
starts_at = dt_util.parse_datetime(starts_at_str)
|
|
if starts_at is None:
|
|
return
|
|
|
|
starts_at = dt_util.as_local(starts_at)
|
|
current_price = price_interval.get("total")
|
|
|
|
if current_price is None:
|
|
return
|
|
|
|
# Calculate trailing average
|
|
trailing_avg = calculate_trailing_average_for_interval(starts_at, all_prices)
|
|
|
|
# Calculate and set the difference and rating_level
|
|
if trailing_avg is not None:
|
|
difference = calculate_difference_percentage(float(current_price), trailing_avg)
|
|
price_interval["difference"] = difference
|
|
|
|
# Calculate rating_level based on difference
|
|
rating_level = calculate_rating_level(difference, threshold_low, threshold_high)
|
|
price_interval["rating_level"] = rating_level
|
|
else:
|
|
# Set to None if we couldn't calculate
|
|
price_interval["difference"] = None
|
|
price_interval["rating_level"] = None
|
|
_LOGGER.debug(
|
|
"Could not calculate trailing average for %s interval %s",
|
|
day_label,
|
|
starts_at,
|
|
)
|
|
|
|
|
|
def enrich_price_info_with_differences(
|
|
price_info: dict[str, Any],
|
|
threshold_low: float | None = None,
|
|
threshold_high: float | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Enrich price info with calculated 'difference' and 'rating_level' values.
|
|
|
|
Computes the trailing 24-hour average, difference percentage, and rating level
|
|
for each interval in today and tomorrow (excluding yesterday since it's historical).
|
|
|
|
Args:
|
|
price_info: Dictionary with 'yesterday', 'today', 'tomorrow' keys
|
|
threshold_low: Low threshold percentage for rating_level (defaults to -10)
|
|
threshold_high: High threshold percentage for rating_level (defaults to 10)
|
|
|
|
Returns:
|
|
Updated price_info dict with 'difference' and 'rating_level' added
|
|
|
|
"""
|
|
if threshold_low is None:
|
|
threshold_low = -10
|
|
if threshold_high is None:
|
|
threshold_high = 10
|
|
|
|
yesterday_prices = price_info.get("yesterday", [])
|
|
today_prices = price_info.get("today", [])
|
|
tomorrow_prices = price_info.get("tomorrow", [])
|
|
|
|
# Combine all prices for lookback calculation
|
|
all_prices = yesterday_prices + today_prices + tomorrow_prices
|
|
|
|
_LOGGER.debug(
|
|
"Enriching price info with differences and rating levels: "
|
|
"yesterday=%d, today=%d, tomorrow=%d, thresholds: low=%.2f, high=%.2f",
|
|
len(yesterday_prices),
|
|
len(today_prices),
|
|
len(tomorrow_prices),
|
|
threshold_low,
|
|
threshold_high,
|
|
)
|
|
|
|
# Process today's prices
|
|
for price_interval in today_prices:
|
|
_process_price_interval(price_interval, all_prices, threshold_low, threshold_high, "today")
|
|
|
|
# Process tomorrow's prices
|
|
for price_interval in tomorrow_prices:
|
|
_process_price_interval(price_interval, all_prices, threshold_low, threshold_high, "tomorrow")
|
|
|
|
return price_info
|
|
|
|
|
|
def find_price_data_for_interval(price_info: Any, target_time: datetime) -> dict | None:
|
|
"""
|
|
Find the price data for a specific 15-minute interval timestamp.
|
|
|
|
Args:
|
|
price_info: The price info dictionary from Tibber API
|
|
target_time: The target timestamp to find price data for
|
|
|
|
Returns:
|
|
Price data dict if found, None otherwise
|
|
|
|
"""
|
|
day_key = "tomorrow" if target_time.date() > dt_util.now().date() else "today"
|
|
search_days = [day_key, "tomorrow" if day_key == "today" else "today"]
|
|
|
|
for search_day in search_days:
|
|
day_prices = price_info.get(search_day, [])
|
|
if not day_prices:
|
|
continue
|
|
|
|
for price_data in day_prices:
|
|
starts_at = dt_util.parse_datetime(price_data["startsAt"])
|
|
if starts_at is None:
|
|
continue
|
|
|
|
starts_at = dt_util.as_local(starts_at)
|
|
interval_end = starts_at + timedelta(minutes=MINUTES_PER_INTERVAL)
|
|
if starts_at <= target_time < interval_end and starts_at.date() == target_time.date():
|
|
return price_data
|
|
|
|
return None
|
|
|
|
|
|
def aggregate_price_levels(levels: list[str]) -> str:
|
|
"""
|
|
Aggregate multiple price levels into a single representative level using median.
|
|
|
|
Takes a list of price level strings (e.g., "VERY_CHEAP", "NORMAL", "EXPENSIVE")
|
|
and returns the median level after sorting by numeric values. This naturally
|
|
tends toward "NORMAL" when levels are mixed.
|
|
|
|
Args:
|
|
levels: List of price level strings from intervals
|
|
|
|
Returns:
|
|
The median price level string, or PRICE_LEVEL_NORMAL if input is empty
|
|
|
|
"""
|
|
if not levels:
|
|
return PRICE_LEVEL_NORMAL
|
|
|
|
# Convert levels to numeric values and sort
|
|
numeric_values = [PRICE_LEVEL_MAPPING.get(level, 0) for level in levels]
|
|
numeric_values.sort()
|
|
|
|
# Get median (middle value for odd length, lower-middle for even length)
|
|
median_idx = len(numeric_values) // 2
|
|
median_value = numeric_values[median_idx]
|
|
|
|
# Convert back to level string
|
|
for level, value in PRICE_LEVEL_MAPPING.items():
|
|
if value == median_value:
|
|
return level
|
|
|
|
return PRICE_LEVEL_NORMAL
|
|
|
|
|
|
def aggregate_price_rating(differences: list[float], threshold_low: float, threshold_high: float) -> tuple[str, float]:
|
|
"""
|
|
Aggregate multiple price differences into a single rating level.
|
|
|
|
Calculates the average difference percentage across multiple intervals
|
|
and applies thresholds to determine the overall rating level.
|
|
|
|
Args:
|
|
differences: List of difference percentages from intervals
|
|
threshold_low: The low threshold percentage for LOW rating
|
|
threshold_high: The high threshold percentage for HIGH rating
|
|
|
|
Returns:
|
|
Tuple of (rating_level, average_difference)
|
|
rating_level: "LOW", "NORMAL", or "HIGH"
|
|
average_difference: The averaged difference percentage
|
|
|
|
"""
|
|
if not differences:
|
|
return PRICE_RATING_NORMAL, 0.0
|
|
|
|
# Filter out None values
|
|
valid_differences = [d for d in differences if d is not None]
|
|
if not valid_differences:
|
|
return PRICE_RATING_NORMAL, 0.0
|
|
|
|
# Calculate average difference
|
|
avg_difference = sum(valid_differences) / len(valid_differences)
|
|
|
|
# Apply thresholds
|
|
rating_level = calculate_rating_level(avg_difference, threshold_low, threshold_high)
|
|
|
|
return rating_level or PRICE_RATING_NORMAL, avg_difference
|
|
|
|
|
|
def aggregate_period_levels(interval_data_list: list[dict[str, Any]]) -> str | None:
|
|
"""
|
|
Aggregate price levels across multiple intervals in a period.
|
|
|
|
Extracts "level" from each interval and uses the same logic as
|
|
aggregate_price_levels() to determine the overall level for the period.
|
|
|
|
Args:
|
|
interval_data_list: List of price interval dictionaries with "level" keys
|
|
|
|
Returns:
|
|
The aggregated level string in lowercase (e.g., "very_cheap", "normal", "expensive"),
|
|
or None if no valid levels found
|
|
|
|
"""
|
|
levels: list[str] = []
|
|
for interval in interval_data_list:
|
|
level = interval.get("level")
|
|
if level is not None and isinstance(level, str):
|
|
levels.append(level)
|
|
|
|
if not levels:
|
|
return None
|
|
|
|
aggregated = aggregate_price_levels(levels)
|
|
# Convert to lowercase for consistency with other enum sensors
|
|
return aggregated.lower() if aggregated else None
|
|
|
|
|
|
def aggregate_period_ratings(
|
|
interval_data_list: list[dict[str, Any]],
|
|
threshold_low: float,
|
|
threshold_high: float,
|
|
) -> tuple[str | None, float | None]:
|
|
"""
|
|
Aggregate price ratings across multiple intervals in a period.
|
|
|
|
Extracts "difference" from each interval and uses the same logic as
|
|
aggregate_price_rating() to determine the overall rating for the period.
|
|
|
|
Args:
|
|
interval_data_list: List of price interval dictionaries with "difference" keys
|
|
threshold_low: The low threshold percentage for LOW rating
|
|
threshold_high: The high threshold percentage for HIGH rating
|
|
|
|
Returns:
|
|
Tuple of (rating_level, average_difference)
|
|
rating_level: "low", "normal", "high" (lowercase), or None if no valid data
|
|
average_difference: The averaged difference percentage, or None if no valid data
|
|
|
|
"""
|
|
differences: list[float] = []
|
|
for interval in interval_data_list:
|
|
diff = interval.get("difference")
|
|
if diff is not None:
|
|
differences.append(float(diff))
|
|
|
|
if not differences:
|
|
return None, None
|
|
|
|
rating_level, avg_diff = aggregate_price_rating(differences, threshold_low, threshold_high)
|
|
# Convert to lowercase for consistency with other enum sensors
|
|
return rating_level.lower() if rating_level else None, avg_diff
|
|
|
|
|
|
def calculate_price_trend(
|
|
current_price: float,
|
|
future_average: float,
|
|
threshold_rising: float = 5.0,
|
|
threshold_falling: float = -5.0,
|
|
) -> tuple[str, float]:
|
|
"""
|
|
Calculate price trend by comparing current price with future average.
|
|
|
|
Args:
|
|
current_price: Current interval price
|
|
future_average: Average price of future intervals
|
|
threshold_rising: Percentage threshold for rising trend (positive, default 5%)
|
|
threshold_falling: Percentage threshold for falling trend (negative, default -5%)
|
|
|
|
Returns:
|
|
Tuple of (trend_state, difference_percentage)
|
|
trend_state: "rising" | "falling" | "stable"
|
|
difference_percentage: % change from current to future ((future - current) / current * 100)
|
|
|
|
"""
|
|
if current_price == 0:
|
|
# Avoid division by zero
|
|
return "stable", 0.0
|
|
|
|
# Calculate percentage difference from current to future
|
|
diff_pct = ((future_average - current_price) / current_price) * 100
|
|
|
|
# Determine trend based on thresholds
|
|
# threshold_falling is negative, so we compare with it directly
|
|
if diff_pct > threshold_rising:
|
|
trend = "rising"
|
|
elif diff_pct < threshold_falling:
|
|
trend = "falling"
|
|
else:
|
|
trend = "stable"
|
|
|
|
return trend, diff_pct
|