mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-30 21:33:39 +00:00
Add resolution parameter to get_chartdata and get_apexcharts_yaml services, allowing users to choose between original 15-minute intervals or aggregated hourly values for chart visualization. Implementation uses rolling 5-interval window aggregation (-2, -1, 0, +1, +2 around :00 of each hour = 60 minutes total), matching the sensor rolling hour methodology. Respects user's CONF_AVERAGE_SENSOR_DISPLAY setting for mean vs median calculation. Changes: - formatters.py: Add aggregate_to_hourly() function preserving original field names (startsAt, total, level, rating_level) for unified processing - get_chartdata.py: Pre-aggregate data before processing when resolution is 'hourly', enabling same code path for filters/insert_nulls/connect_segments - get_apexcharts_yaml.py: Add resolution parameter, pass to all 4 get_chartdata service calls in generated JavaScript - services.yaml: Add resolution field with interval/hourly selector - icons.json: Add section icons for get_apexcharts_yaml fields - translations: Add highlight_peak_price and resolution field translations for all 5 languages (en, de, sv, nb, nl) Impact: Users can now generate cleaner charts with 24 hourly data points instead of 96 quarter-hourly intervals. The unified processing approach ensures all chart features (filters, null insertion, segment connection) work identically for both resolutions.
518 lines
21 KiB
Python
518 lines
21 KiB
Python
"""
|
|
Data formatting utilities for services.
|
|
|
|
This module contains data transformation and formatting functions used across
|
|
multiple service handlers, including level normalization, hourly aggregation,
|
|
and period data extraction.
|
|
|
|
Functions:
|
|
normalize_level_filter: Convert level filter values to uppercase
|
|
normalize_rating_level_filter: Convert rating level filter values to uppercase
|
|
aggregate_hourly_exact: Aggregate 15-minute intervals to exact hourly averages
|
|
get_period_data: Extract period summary data instead of interval data
|
|
get_level_translation: Get translated name for price level or rating level
|
|
|
|
Used by:
|
|
- services/chartdata.py: Main data export service
|
|
- services/apexcharts.py: ApexCharts YAML generation
|
|
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, time
|
|
from typing import Any
|
|
|
|
from custom_components.tibber_prices.const import (
|
|
CONF_AVERAGE_SENSOR_DISPLAY,
|
|
DEFAULT_AVERAGE_SENSOR_DISPLAY,
|
|
DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
|
|
DEFAULT_PRICE_RATING_THRESHOLD_LOW,
|
|
get_translation,
|
|
)
|
|
from custom_components.tibber_prices.coordinator.helpers import (
|
|
get_intervals_for_day_offsets,
|
|
)
|
|
from custom_components.tibber_prices.sensor.helpers import aggregate_level_data, aggregate_rating_data
|
|
from custom_components.tibber_prices.utils.average import calculate_mean, calculate_median
|
|
|
|
|
|
def normalize_level_filter(value: list[str] | None) -> list[str] | None:
|
|
"""Convert level filter values to uppercase for case-insensitive comparison."""
|
|
if value is None:
|
|
return None
|
|
return [v.upper() for v in value]
|
|
|
|
|
|
def normalize_rating_level_filter(value: list[str] | None) -> list[str] | None:
|
|
"""Convert rating level filter values to uppercase for case-insensitive comparison."""
|
|
if value is None:
|
|
return None
|
|
return [v.upper() for v in value]
|
|
|
|
|
|
def aggregate_to_hourly( # noqa: PLR0912
|
|
intervals: list[dict],
|
|
coordinator: Any,
|
|
threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW,
|
|
threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
|
|
) -> list[dict]:
|
|
"""
|
|
Aggregate 15-minute intervals to hourly using rolling 5-interval window.
|
|
|
|
Preserves original field names (startsAt, total, level, rating_level) so the
|
|
aggregated data can be processed by the same code path as interval data.
|
|
|
|
Uses the same methodology as sensor rolling hour calculations:
|
|
- 5-interval window: 2 before + center + 2 after (60 minutes total)
|
|
- Center interval is at :00 of each hour
|
|
- Respects user's CONF_AVERAGE_SENSOR_DISPLAY setting (mean vs median)
|
|
|
|
Example for 10:00 data point:
|
|
- Window includes: 09:30, 09:45, 10:00, 10:15, 10:30
|
|
|
|
Args:
|
|
intervals: List of 15-minute price intervals with startsAt, total, level, rating_level
|
|
coordinator: Data update coordinator instance
|
|
threshold_low: Rating level threshold (low/normal boundary)
|
|
threshold_high: Rating level threshold (normal/high boundary)
|
|
|
|
Returns:
|
|
List of hourly data points with same structure as input (startsAt, total, level, rating_level)
|
|
|
|
"""
|
|
if not intervals:
|
|
return []
|
|
|
|
# Get user's average display preference (mean or median)
|
|
average_display = coordinator.config_entry.options.get(CONF_AVERAGE_SENSOR_DISPLAY, DEFAULT_AVERAGE_SENSOR_DISPLAY)
|
|
use_median = average_display == "median"
|
|
|
|
hourly_data = []
|
|
|
|
# Iterate through all intervals, only process those at :00
|
|
for i, interval in enumerate(intervals):
|
|
start_time = interval.get("startsAt")
|
|
|
|
if not start_time:
|
|
continue
|
|
|
|
# Check if this is the start of an hour (:00)
|
|
if start_time.minute != 0:
|
|
continue
|
|
|
|
# Collect 5-interval rolling window: -2, -1, 0, +1, +2
|
|
window_prices: list[float] = []
|
|
window_intervals: list[dict] = []
|
|
|
|
for offset in range(-2, 3): # -2, -1, 0, +1, +2
|
|
target_idx = i + offset
|
|
if 0 <= target_idx < len(intervals):
|
|
target_interval = intervals[target_idx]
|
|
price = target_interval.get("total")
|
|
if price is not None:
|
|
window_prices.append(price)
|
|
window_intervals.append(target_interval)
|
|
|
|
# Calculate aggregated price based on user preference
|
|
if window_prices:
|
|
aggregated_price = calculate_median(window_prices) if use_median else calculate_mean(window_prices)
|
|
|
|
if aggregated_price is None:
|
|
continue
|
|
|
|
# Build data point with original field names
|
|
data_point: dict[str, Any] = {
|
|
"startsAt": start_time,
|
|
"total": aggregated_price,
|
|
}
|
|
|
|
# Add aggregated level
|
|
if window_intervals:
|
|
aggregated_level = aggregate_level_data(window_intervals)
|
|
if aggregated_level:
|
|
data_point["level"] = aggregated_level.upper()
|
|
|
|
# Add aggregated rating_level
|
|
if window_intervals:
|
|
aggregated_rating = aggregate_rating_data(window_intervals, threshold_low, threshold_high)
|
|
if aggregated_rating:
|
|
data_point["rating_level"] = aggregated_rating.upper()
|
|
|
|
hourly_data.append(data_point)
|
|
|
|
return hourly_data
|
|
|
|
|
|
def aggregate_hourly_exact( # noqa: PLR0913, PLR0912, PLR0915
|
|
intervals: list[dict],
|
|
start_time_field: str,
|
|
price_field: str,
|
|
*,
|
|
coordinator: Any,
|
|
use_subunit_currency: bool = False,
|
|
round_decimals: int | None = None,
|
|
include_level: bool = False,
|
|
include_rating_level: bool = False,
|
|
level_filter: list[str] | None = None,
|
|
rating_level_filter: list[str] | None = None,
|
|
include_average: bool = False,
|
|
level_field: str = "level",
|
|
rating_level_field: str = "rating_level",
|
|
average_field: str = "average",
|
|
day_average: float | None = None,
|
|
threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW,
|
|
threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
|
|
period_timestamps: set[str] | None = None,
|
|
) -> list[dict]:
|
|
"""
|
|
Aggregate 15-minute intervals to exact hourly averages.
|
|
|
|
Each hour uses exactly 4 intervals (00:00, 00:15, 00:30, 00:45).
|
|
Returns data points at the start of each hour.
|
|
|
|
Args:
|
|
intervals: List of 15-minute price intervals
|
|
start_time_field: Custom name for start time field
|
|
price_field: Custom name for price field
|
|
coordinator: Data update coordinator instance (required)
|
|
use_subunit_currency: Convert to subunit currency units (cents/øre)
|
|
round_decimals: Optional decimal rounding
|
|
include_level: Include aggregated level field
|
|
include_rating_level: Include aggregated rating_level field
|
|
level_filter: Filter intervals by level values
|
|
rating_level_filter: Filter intervals by rating_level values
|
|
include_average: Include day average in output
|
|
level_field: Custom name for level field
|
|
rating_level_field: Custom name for rating_level field
|
|
average_field: Custom name for average field
|
|
day_average: Day average value to include
|
|
threshold_low: Rating level threshold (low/normal boundary)
|
|
threshold_high: Rating level threshold (normal/high boundary)
|
|
period_timestamps: Set of timestamps to filter by (period filter)
|
|
|
|
Returns:
|
|
List of hourly data points with aggregated values
|
|
|
|
"""
|
|
if not intervals:
|
|
return []
|
|
|
|
hourly_data = []
|
|
i = 0
|
|
|
|
while i < len(intervals):
|
|
interval = intervals[i]
|
|
start_time_str = interval.get("startsAt")
|
|
|
|
if not start_time_str:
|
|
i += 1
|
|
continue
|
|
|
|
# Get timestamp (already datetime in local timezone)
|
|
time = coordinator.time
|
|
start_time = start_time_str # Already datetime object
|
|
if not start_time:
|
|
i += 1
|
|
continue
|
|
|
|
# Check if this is the start of an hour (:00)
|
|
if start_time.minute != 0:
|
|
i += 1
|
|
continue
|
|
|
|
# Collect intervals for this hour (with optional filtering)
|
|
intervals_per_hour = time.minutes_to_intervals(60)
|
|
hour_intervals = []
|
|
hour_interval_data = [] # Complete interval data for aggregation functions
|
|
for j in range(intervals_per_hour):
|
|
if i + j < len(intervals):
|
|
interval = intervals[i + j]
|
|
|
|
# Apply period filter if specified (check startsAt timestamp)
|
|
if period_timestamps is not None:
|
|
interval_start = interval.get("startsAt")
|
|
if interval_start and interval_start not in period_timestamps:
|
|
continue
|
|
|
|
# Apply level filter if specified
|
|
if level_filter is not None and "level" in interval and interval["level"] not in level_filter:
|
|
continue
|
|
|
|
# Apply rating_level filter if specified
|
|
if (
|
|
rating_level_filter is not None
|
|
and "rating_level" in interval
|
|
and interval["rating_level"] not in rating_level_filter
|
|
):
|
|
continue
|
|
|
|
price = interval.get("total")
|
|
if price is not None:
|
|
hour_intervals.append(price)
|
|
hour_interval_data.append(interval)
|
|
|
|
# Calculate average if we have data
|
|
if hour_intervals:
|
|
avg_price = sum(hour_intervals) / len(hour_intervals)
|
|
|
|
# Convert to subunit currency (cents/øre) if requested
|
|
avg_price = round(avg_price * 100, 2) if use_subunit_currency else round(avg_price, 4)
|
|
|
|
# Apply custom rounding if specified
|
|
if round_decimals is not None:
|
|
avg_price = round(avg_price, round_decimals)
|
|
|
|
data_point = {
|
|
start_time_field: start_time_str.isoformat()
|
|
if hasattr(start_time_str, "isoformat")
|
|
else start_time_str,
|
|
price_field: avg_price,
|
|
}
|
|
|
|
# Add aggregated level using same logic as sensors
|
|
if include_level and hour_interval_data:
|
|
aggregated_level = aggregate_level_data(hour_interval_data)
|
|
if aggregated_level:
|
|
data_point[level_field] = aggregated_level.upper() # Convert back to uppercase
|
|
|
|
# Add aggregated rating_level using same logic as sensors
|
|
if include_rating_level and hour_interval_data:
|
|
aggregated_rating = aggregate_rating_data(hour_interval_data, threshold_low, threshold_high)
|
|
if aggregated_rating:
|
|
data_point[rating_level_field] = aggregated_rating.upper() # Convert back to uppercase
|
|
|
|
# Add average if requested
|
|
if include_average and day_average is not None:
|
|
data_point[average_field] = day_average
|
|
|
|
hourly_data.append(data_point)
|
|
|
|
# Move to next hour (skip intervals_per_hour)
|
|
i += time.minutes_to_intervals(60)
|
|
|
|
return hourly_data
|
|
|
|
|
|
def get_period_data( # noqa: PLR0913, PLR0912, PLR0915
|
|
*,
|
|
coordinator: Any,
|
|
period_filter: str,
|
|
days: list[str],
|
|
output_format: str,
|
|
subunit_currency: bool,
|
|
round_decimals: int | None,
|
|
level_filter: list[str] | None,
|
|
rating_level_filter: list[str] | None,
|
|
include_level: bool,
|
|
include_rating_level: bool,
|
|
start_time_field: str,
|
|
end_time_field: str,
|
|
price_field: str,
|
|
level_field: str,
|
|
rating_level_field: str,
|
|
data_key: str,
|
|
insert_nulls: str,
|
|
add_trailing_null: bool,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Get period summary data instead of interval data.
|
|
|
|
When period_filter is specified, returns the precomputed period summaries
|
|
from the coordinator instead of filtering intervals.
|
|
|
|
Note: Period prices (price_median) are stored in base currency units (€/kr/$/£).
|
|
They are converted to subunit currency units (ct/øre/¢/p) if subunit_currency=True.
|
|
|
|
Args:
|
|
coordinator: Data coordinator with period summaries
|
|
period_filter: "best_price" or "peak_price"
|
|
days: List of days to include
|
|
output_format: "array_of_objects" or "array_of_arrays"
|
|
subunit_currency: If False, convert prices from minor to major units
|
|
round_decimals: Optional decimal rounding
|
|
level_filter: Optional level filter
|
|
rating_level_filter: Optional rating level filter
|
|
include_level: Whether to include level field in output
|
|
include_rating_level: Whether to include rating_level field in output
|
|
start_time_field: Custom name for start time field
|
|
end_time_field: Custom name for end time field
|
|
price_field: Custom name for price field
|
|
level_field: Custom name for level field
|
|
rating_level_field: Custom name for rating_level field
|
|
data_key: Top-level key name in response
|
|
insert_nulls: NULL insertion mode ('none', 'segments', 'all')
|
|
add_trailing_null: Whether to add trailing null point
|
|
|
|
Returns:
|
|
Dictionary with period data in requested format
|
|
|
|
"""
|
|
periods_data = coordinator.data.get("pricePeriods", {})
|
|
period_data = periods_data.get(period_filter)
|
|
|
|
if not period_data:
|
|
return {data_key: []}
|
|
|
|
period_summaries = period_data.get("periods", [])
|
|
if not period_summaries:
|
|
return {data_key: []}
|
|
|
|
chart_data = []
|
|
|
|
# Filter periods by day if requested
|
|
filtered_periods = []
|
|
if days:
|
|
# Use helper to get intervals for requested days, extract their dates
|
|
# Map day keys to offsets: yesterday=-1, today=0, tomorrow=1
|
|
day_offset_map = {"yesterday": -1, "today": 0, "tomorrow": 1}
|
|
offsets = [day_offset_map[day] for day in days]
|
|
day_intervals = get_intervals_for_day_offsets(coordinator.data, offsets)
|
|
allowed_dates = {interval["startsAt"].date() for interval in day_intervals if interval.get("startsAt")}
|
|
|
|
# Calculate day boundaries for trimming
|
|
# Find min/max dates to determine the overall requested window
|
|
if allowed_dates:
|
|
min_date = min(allowed_dates)
|
|
max_date = max(allowed_dates)
|
|
|
|
# CRITICAL: Trim periods that span day boundaries
|
|
# Window start = midnight of first requested day
|
|
# Window end = midnight of day AFTER last requested day (exclusive boundary)
|
|
window_start = datetime.combine(min_date, time.min)
|
|
window_end = datetime.combine(max_date, time.max).replace(microsecond=999999)
|
|
|
|
# Make timezone-aware using coordinator's time service
|
|
window_start = coordinator.time.as_local(window_start)
|
|
window_end = coordinator.time.as_local(window_end)
|
|
|
|
# Filter and trim periods to window
|
|
for period in period_summaries:
|
|
start = period.get("start")
|
|
end = period.get("end")
|
|
|
|
if not start:
|
|
continue
|
|
|
|
# Skip periods that end before window or start after window
|
|
if end and end <= window_start:
|
|
continue
|
|
if start >= window_end:
|
|
continue
|
|
|
|
# Trim period to window boundaries
|
|
trimmed_period = period.copy()
|
|
if start < window_start:
|
|
trimmed_period["start"] = window_start
|
|
if end and end > window_end:
|
|
trimmed_period["end"] = window_end
|
|
|
|
filtered_periods.append(trimmed_period)
|
|
else:
|
|
filtered_periods = period_summaries
|
|
|
|
# Apply level and rating_level filters
|
|
for period in filtered_periods:
|
|
# Apply level filter (normalize to uppercase for comparison)
|
|
if level_filter and "level" in period and period["level"].upper() not in level_filter:
|
|
continue
|
|
|
|
# Apply rating_level filter (normalize to uppercase for comparison)
|
|
if (
|
|
rating_level_filter
|
|
and "rating_level" in period
|
|
and period["rating_level"].upper() not in rating_level_filter
|
|
):
|
|
continue
|
|
|
|
# Build data point based on output format
|
|
if output_format == "array_of_objects":
|
|
# Map period fields to custom field names
|
|
# Period has: start, end, level, rating_level, price_mean, price_median, price_min, price_max
|
|
data_point = {}
|
|
|
|
# Start time
|
|
start = period["start"]
|
|
data_point[start_time_field] = start.isoformat() if hasattr(start, "isoformat") else start
|
|
|
|
# End time
|
|
end = period.get("end")
|
|
data_point[end_time_field] = end.isoformat() if end and hasattr(end, "isoformat") else end
|
|
|
|
# Price (use price_median from period for visual consistency with sensor states)
|
|
# Median is more representative than mean for periods with gap tolerance
|
|
# (single "normal" intervals between cheap/expensive ones don't skew the display)
|
|
price_median = period.get("price_median", 0.0)
|
|
# Convert to subunit currency if subunit_currency=True (periods stored in base currency)
|
|
if subunit_currency:
|
|
price_median = price_median * 100
|
|
# Apply rounding: use round_decimals if provided, otherwise default precision
|
|
precision = round_decimals if round_decimals is not None else (2 if subunit_currency else 4)
|
|
price_median = round(price_median, precision)
|
|
data_point[price_field] = price_median
|
|
|
|
# Level (only if requested and present)
|
|
if include_level and "level" in period:
|
|
data_point[level_field] = period["level"].upper()
|
|
|
|
# Rating level (only if requested and present)
|
|
if include_rating_level and "rating_level" in period:
|
|
data_point[rating_level_field] = period["rating_level"].upper()
|
|
|
|
chart_data.append(data_point)
|
|
|
|
else: # array_of_arrays
|
|
# For array_of_arrays, include 2-3 points per period depending on insert_nulls:
|
|
# Always:
|
|
# 1. Start time with price (begin period)
|
|
# 2. End time with price (hold price until end)
|
|
# If insert_nulls='segments' or 'all':
|
|
# 3. End time with NULL (cleanly terminate segment for ApexCharts)
|
|
# Use price_median for consistency with sensor states (more representative for periods)
|
|
price_median = period.get("price_median", 0.0)
|
|
# Convert to subunit currency if subunit_currency=True (periods stored in base currency)
|
|
if subunit_currency:
|
|
price_median = price_median * 100
|
|
# Apply rounding: use round_decimals if provided, otherwise default precision
|
|
precision = round_decimals if round_decimals is not None else (2 if subunit_currency else 4)
|
|
price_median = round(price_median, precision)
|
|
start = period["start"]
|
|
end = period.get("end")
|
|
start_serialized = start.isoformat() if hasattr(start, "isoformat") else start
|
|
end_serialized = end.isoformat() if end and hasattr(end, "isoformat") else end
|
|
|
|
# Add data points per period
|
|
chart_data.append([start_serialized, price_median]) # 1. Start with price
|
|
if end_serialized:
|
|
chart_data.append([end_serialized, price_median]) # 2. End with price (hold level)
|
|
# 3. Add NULL terminator only if insert_nulls is enabled
|
|
if insert_nulls in ("segments", "all"):
|
|
chart_data.append([end_serialized, None]) # 3. End with NULL (terminate segment)
|
|
|
|
# Add trailing null point if requested (independent of insert_nulls)
|
|
# This adds an additional NULL at the end of the entire data series.
|
|
# If both insert_nulls and add_trailing_null are enabled, you get:
|
|
# - NULL terminator after each period (from insert_nulls)
|
|
# - Additional NULL at the very end (from add_trailing_null)
|
|
if add_trailing_null and chart_data:
|
|
if output_format == "array_of_objects":
|
|
null_point = {start_time_field: None, end_time_field: None}
|
|
for field in [price_field, level_field, rating_level_field]:
|
|
null_point[field] = None
|
|
chart_data.append(null_point)
|
|
else: # array_of_arrays
|
|
chart_data.append([None, None])
|
|
|
|
return {data_key: chart_data}
|
|
|
|
|
|
def get_level_translation(level_key: str, level_type: str, language: str) -> str:
|
|
"""Get translated name for a price level or rating level."""
|
|
level_key_lower = level_key.lower()
|
|
# Use correct translation key based on level_type
|
|
if level_type == "rating_level":
|
|
name = get_translation(["selector", "rating_level_filter", "options", level_key_lower], language)
|
|
else:
|
|
name = get_translation(["selector", "level_filter", "options", level_key_lower], language)
|
|
# Fallback to original key if translation not found
|
|
return name or level_key
|