hass.tibber_prices/custom_components/tibber_prices/services/formatters.py
2025-12-11 23:07:06 +00:00

422 lines
17 KiB
Python

"""
Data formatting utilities for services.
This module contains data transformation and formatting functions used across
multiple service handlers, including level normalization, hourly aggregation,
and period data extraction.
Functions:
normalize_level_filter: Convert level filter values to uppercase
normalize_rating_level_filter: Convert rating level filter values to uppercase
aggregate_hourly_exact: Aggregate 15-minute intervals to exact hourly averages
get_period_data: Extract period summary data instead of interval data
get_level_translation: Get translated name for price level or rating level
Used by:
- services/chartdata.py: Main data export service
- services/apexcharts.py: ApexCharts YAML generation
"""
from __future__ import annotations
from datetime import datetime, time
from typing import Any
from custom_components.tibber_prices.const import (
DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
DEFAULT_PRICE_RATING_THRESHOLD_LOW,
get_translation,
)
from custom_components.tibber_prices.coordinator.helpers import (
get_intervals_for_day_offsets,
)
from custom_components.tibber_prices.sensor.helpers import aggregate_level_data, aggregate_rating_data
def normalize_level_filter(value: list[str] | None) -> list[str] | None:
"""Convert level filter values to uppercase for case-insensitive comparison."""
if value is None:
return None
return [v.upper() for v in value]
def normalize_rating_level_filter(value: list[str] | None) -> list[str] | None:
"""Convert rating level filter values to uppercase for case-insensitive comparison."""
if value is None:
return None
return [v.upper() for v in value]
def aggregate_hourly_exact( # noqa: PLR0913, PLR0912, PLR0915
intervals: list[dict],
start_time_field: str,
price_field: str,
*,
coordinator: Any,
use_subunit_currency: bool = False,
round_decimals: int | None = None,
include_level: bool = False,
include_rating_level: bool = False,
level_filter: list[str] | None = None,
rating_level_filter: list[str] | None = None,
include_average: bool = False,
level_field: str = "level",
rating_level_field: str = "rating_level",
average_field: str = "average",
day_average: float | None = None,
threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW,
threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
period_timestamps: set[str] | None = None,
) -> list[dict]:
"""
Aggregate 15-minute intervals to exact hourly averages.
Each hour uses exactly 4 intervals (00:00, 00:15, 00:30, 00:45).
Returns data points at the start of each hour.
Args:
intervals: List of 15-minute price intervals
start_time_field: Custom name for start time field
price_field: Custom name for price field
coordinator: Data update coordinator instance (required)
use_subunit_currency: Convert to subunit currency units (cents/øre)
round_decimals: Optional decimal rounding
include_level: Include aggregated level field
include_rating_level: Include aggregated rating_level field
level_filter: Filter intervals by level values
rating_level_filter: Filter intervals by rating_level values
include_average: Include day average in output
level_field: Custom name for level field
rating_level_field: Custom name for rating_level field
average_field: Custom name for average field
day_average: Day average value to include
threshold_low: Rating level threshold (low/normal boundary)
threshold_high: Rating level threshold (normal/high boundary)
period_timestamps: Set of timestamps to filter by (period filter)
Returns:
List of hourly data points with aggregated values
"""
if not intervals:
return []
hourly_data = []
i = 0
while i < len(intervals):
interval = intervals[i]
start_time_str = interval.get("startsAt")
if not start_time_str:
i += 1
continue
# Get timestamp (already datetime in local timezone)
time = coordinator.time
start_time = start_time_str # Already datetime object
if not start_time:
i += 1
continue
# Check if this is the start of an hour (:00)
if start_time.minute != 0:
i += 1
continue
# Collect intervals for this hour (with optional filtering)
intervals_per_hour = time.minutes_to_intervals(60)
hour_intervals = []
hour_interval_data = [] # Complete interval data for aggregation functions
for j in range(intervals_per_hour):
if i + j < len(intervals):
interval = intervals[i + j]
# Apply period filter if specified (check startsAt timestamp)
if period_timestamps is not None:
interval_start = interval.get("startsAt")
if interval_start and interval_start not in period_timestamps:
continue
# Apply level filter if specified
if level_filter is not None and "level" in interval and interval["level"] not in level_filter:
continue
# Apply rating_level filter if specified
if (
rating_level_filter is not None
and "rating_level" in interval
and interval["rating_level"] not in rating_level_filter
):
continue
price = interval.get("total")
if price is not None:
hour_intervals.append(price)
hour_interval_data.append(interval)
# Calculate average if we have data
if hour_intervals:
avg_price = sum(hour_intervals) / len(hour_intervals)
# Convert to subunit currency (cents/øre) if requested
avg_price = round(avg_price * 100, 2) if use_subunit_currency else round(avg_price, 4)
# Apply custom rounding if specified
if round_decimals is not None:
avg_price = round(avg_price, round_decimals)
data_point = {
start_time_field: start_time_str.isoformat()
if hasattr(start_time_str, "isoformat")
else start_time_str,
price_field: avg_price,
}
# Add aggregated level using same logic as sensors
if include_level and hour_interval_data:
aggregated_level = aggregate_level_data(hour_interval_data)
if aggregated_level:
data_point[level_field] = aggregated_level.upper() # Convert back to uppercase
# Add aggregated rating_level using same logic as sensors
if include_rating_level and hour_interval_data:
aggregated_rating = aggregate_rating_data(hour_interval_data, threshold_low, threshold_high)
if aggregated_rating:
data_point[rating_level_field] = aggregated_rating.upper() # Convert back to uppercase
# Add average if requested
if include_average and day_average is not None:
data_point[average_field] = day_average
hourly_data.append(data_point)
# Move to next hour (skip intervals_per_hour)
i += time.minutes_to_intervals(60)
return hourly_data
def get_period_data( # noqa: PLR0913, PLR0912, PLR0915
*,
coordinator: Any,
period_filter: str,
days: list[str],
output_format: str,
subunit_currency: bool,
round_decimals: int | None,
level_filter: list[str] | None,
rating_level_filter: list[str] | None,
include_level: bool,
include_rating_level: bool,
start_time_field: str,
end_time_field: str,
price_field: str,
level_field: str,
rating_level_field: str,
data_key: str,
insert_nulls: str,
add_trailing_null: bool,
) -> dict[str, Any]:
"""
Get period summary data instead of interval data.
When period_filter is specified, returns the precomputed period summaries
from the coordinator instead of filtering intervals.
Note: Period prices (price_median) are stored in base currency units (€/kr/$/£).
They are converted to subunit currency units (ct/øre/¢/p) if subunit_currency=True.
Args:
coordinator: Data coordinator with period summaries
period_filter: "best_price" or "peak_price"
days: List of days to include
output_format: "array_of_objects" or "array_of_arrays"
subunit_currency: If False, convert prices from minor to major units
round_decimals: Optional decimal rounding
level_filter: Optional level filter
rating_level_filter: Optional rating level filter
include_level: Whether to include level field in output
include_rating_level: Whether to include rating_level field in output
start_time_field: Custom name for start time field
end_time_field: Custom name for end time field
price_field: Custom name for price field
level_field: Custom name for level field
rating_level_field: Custom name for rating_level field
data_key: Top-level key name in response
insert_nulls: NULL insertion mode ('none', 'segments', 'all')
add_trailing_null: Whether to add trailing null point
Returns:
Dictionary with period data in requested format
"""
periods_data = coordinator.data.get("pricePeriods", {})
period_data = periods_data.get(period_filter)
if not period_data:
return {data_key: []}
period_summaries = period_data.get("periods", [])
if not period_summaries:
return {data_key: []}
chart_data = []
# Filter periods by day if requested
filtered_periods = []
if days:
# Use helper to get intervals for requested days, extract their dates
# Map day keys to offsets: yesterday=-1, today=0, tomorrow=1
day_offset_map = {"yesterday": -1, "today": 0, "tomorrow": 1}
offsets = [day_offset_map[day] for day in days]
day_intervals = get_intervals_for_day_offsets(coordinator.data, offsets)
allowed_dates = {interval["startsAt"].date() for interval in day_intervals if interval.get("startsAt")}
# Calculate day boundaries for trimming
# Find min/max dates to determine the overall requested window
if allowed_dates:
min_date = min(allowed_dates)
max_date = max(allowed_dates)
# CRITICAL: Trim periods that span day boundaries
# Window start = midnight of first requested day
# Window end = midnight of day AFTER last requested day (exclusive boundary)
window_start = datetime.combine(min_date, time.min)
window_end = datetime.combine(max_date, time.max).replace(microsecond=999999)
# Make timezone-aware using coordinator's time service
window_start = coordinator.time.as_local(window_start)
window_end = coordinator.time.as_local(window_end)
# Filter and trim periods to window
for period in period_summaries:
start = period.get("start")
end = period.get("end")
if not start:
continue
# Skip periods that end before window or start after window
if end and end <= window_start:
continue
if start >= window_end:
continue
# Trim period to window boundaries
trimmed_period = period.copy()
if start < window_start:
trimmed_period["start"] = window_start
if end and end > window_end:
trimmed_period["end"] = window_end
filtered_periods.append(trimmed_period)
else:
filtered_periods = period_summaries
# Apply level and rating_level filters
for period in filtered_periods:
# Apply level filter (normalize to uppercase for comparison)
if level_filter and "level" in period and period["level"].upper() not in level_filter:
continue
# Apply rating_level filter (normalize to uppercase for comparison)
if (
rating_level_filter
and "rating_level" in period
and period["rating_level"].upper() not in rating_level_filter
):
continue
# Build data point based on output format
if output_format == "array_of_objects":
# Map period fields to custom field names
# Period has: start, end, level, rating_level, price_mean, price_median, price_min, price_max
data_point = {}
# Start time
start = period["start"]
data_point[start_time_field] = start.isoformat() if hasattr(start, "isoformat") else start
# End time
end = period.get("end")
data_point[end_time_field] = end.isoformat() if end and hasattr(end, "isoformat") else end
# Price (use price_median from period for visual consistency with sensor states)
# Median is more representative than mean for periods with gap tolerance
# (single "normal" intervals between cheap/expensive ones don't skew the display)
price_median = period.get("price_median", 0.0)
# Convert to subunit currency if subunit_currency=True (periods stored in base currency)
if subunit_currency:
price_median = price_median * 100
# Apply rounding: use round_decimals if provided, otherwise default precision
precision = round_decimals if round_decimals is not None else (2 if subunit_currency else 4)
price_median = round(price_median, precision)
data_point[price_field] = price_median
# Level (only if requested and present)
if include_level and "level" in period:
data_point[level_field] = period["level"].upper()
# Rating level (only if requested and present)
if include_rating_level and "rating_level" in period:
data_point[rating_level_field] = period["rating_level"].upper()
chart_data.append(data_point)
else: # array_of_arrays
# For array_of_arrays, include 2-3 points per period depending on insert_nulls:
# Always:
# 1. Start time with price (begin period)
# 2. End time with price (hold price until end)
# If insert_nulls='segments' or 'all':
# 3. End time with NULL (cleanly terminate segment for ApexCharts)
# Use price_median for consistency with sensor states (more representative for periods)
price_median = period.get("price_median", 0.0)
# Convert to subunit currency if subunit_currency=True (periods stored in base currency)
if subunit_currency:
price_median = price_median * 100
# Apply rounding: use round_decimals if provided, otherwise default precision
precision = round_decimals if round_decimals is not None else (2 if subunit_currency else 4)
price_median = round(price_median, precision)
start = period["start"]
end = period.get("end")
start_serialized = start.isoformat() if hasattr(start, "isoformat") else start
end_serialized = end.isoformat() if end and hasattr(end, "isoformat") else end
# Add data points per period
chart_data.append([start_serialized, price_median]) # 1. Start with price
if end_serialized:
chart_data.append([end_serialized, price_median]) # 2. End with price (hold level)
# 3. Add NULL terminator only if insert_nulls is enabled
if insert_nulls in ("segments", "all"):
chart_data.append([end_serialized, None]) # 3. End with NULL (terminate segment)
# Add trailing null point if requested (independent of insert_nulls)
# This adds an additional NULL at the end of the entire data series.
# If both insert_nulls and add_trailing_null are enabled, you get:
# - NULL terminator after each period (from insert_nulls)
# - Additional NULL at the very end (from add_trailing_null)
if add_trailing_null and chart_data:
if output_format == "array_of_objects":
null_point = {start_time_field: None, end_time_field: None}
for field in [price_field, level_field, rating_level_field]:
null_point[field] = None
chart_data.append(null_point)
else: # array_of_arrays
chart_data.append([None, None])
return {data_key: chart_data}
def get_level_translation(level_key: str, level_type: str, language: str) -> str:
"""Get translated name for a price level or rating level."""
level_key_lower = level_key.lower()
# Use correct translation key based on level_type
if level_type == "rating_level":
name = get_translation(["selector", "rating_level_filter", "options", level_key_lower], language)
else:
name = get_translation(["selector", "level_filter", "options", level_key_lower], language)
# Fallback to original key if translation not found
return name or level_key