hass.tibber_prices/custom_components/tibber_prices/services.py
Julian Pawlowski 38ce1c4c50 feat(chart_export): add Chart Data Export diagnostic sensor
Added optional diagnostic binary sensor that exposes get_chartdata
service results as entity attributes for legacy dashboard tools.

Key features:
- Entity: binary_sensor.tibber_home_NAME_chart_data_export
- Configurable via Options Flow Step 7 (YAML parameters)
- Calls get_chartdata service with user configuration
- Exposes response as attributes for chart cards
- Disabled by default (opt-in)
- Auto-refreshes on coordinator updates
- Manual refresh via homeassistant.update_entity

Implementation details:
- Added chart_data_export entity description to definitions.py
- Implemented state/attribute logic in binary_sensor/core.py
- Added YAML configuration schema in schemas.py
- Added validation in options_flow.py (Step 7)
- Service call validation with detailed error messages
- Attribute ordering: metadata first, descriptions next, service data last
- Dynamic icon mapping (database-export/database-alert)

Translations:
- Added chart_data_export_config to all 5 languages
- Added Step 7 descriptions with legacy warning
- Added invalid_yaml_syntax/invalid_yaml_structure error messages
- Added custom_translations for sensor descriptions

Documentation:
- Added Chart Data Export section to sensors.md
- Added comprehensive service guide to services.md
- Migration path from sensor to service
- Configuration instructions via Options Flow

Impact: Provides backward compatibility for dashboard tools that can
only read entity attributes (e.g., older ApexCharts versions). New
integrations should use tibber_prices.get_chartdata service directly.
2025-11-17 03:14:02 +00:00

1096 lines
45 KiB
Python

"""Services for Tibber Prices integration."""
from __future__ import annotations
import re
from datetime import timedelta
from typing import Any, Final
import voluptuous as vol
from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse, callback
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry
from homeassistant.util import dt as dt_util
from .api import (
TibberPricesApiClientAuthenticationError,
TibberPricesApiClientCommunicationError,
TibberPricesApiClientError,
)
from .const import (
CONF_PRICE_RATING_THRESHOLD_HIGH,
CONF_PRICE_RATING_THRESHOLD_LOW,
DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
DEFAULT_PRICE_RATING_THRESHOLD_LOW,
DOMAIN,
PRICE_LEVEL_CHEAP,
PRICE_LEVEL_EXPENSIVE,
PRICE_LEVEL_NORMAL,
PRICE_LEVEL_VERY_CHEAP,
PRICE_LEVEL_VERY_EXPENSIVE,
PRICE_RATING_HIGH,
PRICE_RATING_LOW,
PRICE_RATING_NORMAL,
format_price_unit_minor,
get_translation,
)
from .sensor.helpers import aggregate_level_data, aggregate_rating_data
APEXCHARTS_YAML_SERVICE_NAME = "get_apexcharts_yaml"
CHARTDATA_SERVICE_NAME = "get_chartdata"
REFRESH_USER_DATA_SERVICE_NAME = "refresh_user_data"
ATTR_DAY: Final = "day"
ATTR_ENTRY_ID: Final = "entry_id"
APEXCHARTS_SERVICE_SCHEMA: Final = vol.Schema(
{
vol.Required(ATTR_ENTRY_ID): str,
vol.Optional("day", default="today"): vol.In(["yesterday", "today", "tomorrow"]),
vol.Optional("level_type", default="rating_level"): vol.In(["rating_level", "level"]),
}
)
def _normalize_level_filter(value: list[str] | None) -> list[str] | None:
"""Convert level filter values to uppercase for case-insensitive comparison."""
if value is None:
return None
return [v.upper() for v in value]
def _normalize_rating_level_filter(value: list[str] | None) -> list[str] | None:
"""Convert rating level filter values to uppercase for case-insensitive comparison."""
if value is None:
return None
return [v.upper() for v in value]
CHARTDATA_SERVICE_SCHEMA: Final = vol.Schema(
{
vol.Required(ATTR_ENTRY_ID): str,
vol.Optional(ATTR_DAY): vol.All(vol.Coerce(list), [vol.In(["yesterday", "today", "tomorrow"])]),
vol.Optional("resolution", default="interval"): vol.In(["interval", "hourly"]),
vol.Optional("output_format", default="array_of_objects"): vol.In(["array_of_objects", "array_of_arrays"]),
vol.Optional("array_fields"): str,
vol.Optional("minor_currency", default=False): bool,
vol.Optional("round_decimals"): vol.All(vol.Coerce(int), vol.Range(min=0, max=10)),
vol.Optional("include_level", default=False): bool,
vol.Optional("include_rating_level", default=False): bool,
vol.Optional("include_average", default=False): bool,
vol.Optional("level_filter"): vol.All(
vol.Coerce(list),
_normalize_level_filter,
[
vol.In(
[
PRICE_LEVEL_VERY_CHEAP,
PRICE_LEVEL_CHEAP,
PRICE_LEVEL_NORMAL,
PRICE_LEVEL_EXPENSIVE,
PRICE_LEVEL_VERY_EXPENSIVE,
]
)
],
),
vol.Optional("rating_level_filter"): vol.All(
vol.Coerce(list),
_normalize_rating_level_filter,
[vol.In([PRICE_RATING_LOW, PRICE_RATING_NORMAL, PRICE_RATING_HIGH])],
),
vol.Optional("insert_nulls", default="none"): vol.In(["none", "segments", "all"]),
vol.Optional("add_trailing_null", default=False): bool,
vol.Optional("period_filter"): vol.In(["best_price", "peak_price"]),
vol.Optional("start_time_field", default="start_time"): str,
vol.Optional("end_time_field", default="end_time"): str,
vol.Optional("price_field", default="price_per_kwh"): str,
vol.Optional("level_field", default="level"): str,
vol.Optional("rating_level_field", default="rating_level"): str,
vol.Optional("average_field", default="average"): str,
vol.Optional("data_key", default="data"): str,
}
)
REFRESH_USER_DATA_SERVICE_SCHEMA: Final = vol.Schema(
{
vol.Required(ATTR_ENTRY_ID): str,
}
)
# --- Entry point: Service handler ---
def _aggregate_hourly_exact( # noqa: PLR0913, PLR0912, PLR0915
intervals: list[dict],
start_time_field: str,
price_field: str,
*,
use_minor_currency: bool = False,
round_decimals: int | None = None,
include_level: bool = False,
include_rating_level: bool = False,
level_filter: list[str] | None = None,
rating_level_filter: list[str] | None = None,
include_average: bool = False,
level_field: str = "level",
rating_level_field: str = "rating_level",
average_field: str = "average",
day_average: float | None = None,
threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW,
threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
period_timestamps: set[str] | None = None,
) -> list[dict]:
"""
Aggregate 15-minute intervals to exact hourly averages.
Each hour uses exactly 4 intervals (00:00, 00:15, 00:30, 00:45).
Returns data points at the start of each hour.
"""
if not intervals:
return []
hourly_data = []
i = 0
while i < len(intervals):
interval = intervals[i]
start_time_str = interval.get("startsAt")
if not start_time_str:
i += 1
continue
# Parse the timestamp
start_time = dt_util.parse_datetime(start_time_str)
if not start_time:
i += 1
continue
# Check if this is the start of an hour (:00)
if start_time.minute != 0:
i += 1
continue
# Collect 4 intervals for this hour (with optional filtering)
hour_intervals = []
hour_interval_data = [] # Complete interval data for aggregation functions
for j in range(4):
if i + j < len(intervals):
interval = intervals[i + j]
# Apply period filter if specified (check startsAt timestamp)
if period_timestamps is not None:
interval_start = interval.get("startsAt")
if interval_start and interval_start not in period_timestamps:
continue
# Apply level filter if specified
if level_filter is not None and "level" in interval and interval["level"] not in level_filter:
continue
# Apply rating_level filter if specified
if (
rating_level_filter is not None
and "rating_level" in interval
and interval["rating_level"] not in rating_level_filter
):
continue
price = interval.get("total")
if price is not None:
hour_intervals.append(price)
hour_interval_data.append(interval)
# Calculate average if we have data
if hour_intervals:
avg_price = sum(hour_intervals) / len(hour_intervals)
# Convert to minor currency (cents/øre) if requested
avg_price = round(avg_price * 100, 2) if use_minor_currency else round(avg_price, 4)
# Apply custom rounding if specified
if round_decimals is not None:
avg_price = round(avg_price, round_decimals)
data_point = {start_time_field: start_time_str, price_field: avg_price}
# Add aggregated level using same logic as sensors
if include_level and hour_interval_data:
aggregated_level = aggregate_level_data(hour_interval_data)
if aggregated_level:
data_point[level_field] = aggregated_level.upper() # Convert back to uppercase
# Add aggregated rating_level using same logic as sensors
if include_rating_level and hour_interval_data:
aggregated_rating = aggregate_rating_data(hour_interval_data, threshold_low, threshold_high)
if aggregated_rating:
data_point[rating_level_field] = aggregated_rating.upper() # Convert back to uppercase
# Add average if requested
if include_average and day_average is not None:
data_point[average_field] = day_average
hourly_data.append(data_point)
# Move to next hour (skip 4 intervals)
i += 4
return hourly_data
def _get_period_data( # noqa: PLR0913, PLR0912, PLR0915
*,
coordinator: Any,
period_filter: str,
days: list[str],
output_format: str,
minor_currency: bool,
round_decimals: int | None,
level_filter: list[str] | None,
rating_level_filter: list[str] | None,
include_level: bool,
include_rating_level: bool,
start_time_field: str,
end_time_field: str,
price_field: str,
level_field: str,
rating_level_field: str,
data_key: str,
add_trailing_null: bool,
) -> dict[str, Any]:
"""
Get period summary data instead of interval data.
When period_filter is specified, returns the precomputed period summaries
from the coordinator instead of filtering intervals.
Note: Period prices (price_avg) are stored in minor currency units (ct/øre).
They are converted to major currency unless minor_currency=True.
Args:
coordinator: Data coordinator with period summaries
period_filter: "best_price" or "peak_price"
days: List of days to include
output_format: "array_of_objects" or "array_of_arrays"
minor_currency: If False, convert prices from minor to major units
round_decimals: Optional decimal rounding
level_filter: Optional level filter
rating_level_filter: Optional rating level filter
include_level: Whether to include level field in output
include_rating_level: Whether to include rating_level field in output
start_time_field: Custom name for start time field
end_time_field: Custom name for end time field
price_field: Custom name for price field
level_field: Custom name for level field
rating_level_field: Custom name for rating_level field
data_key: Top-level key name in response
add_trailing_null: Whether to add trailing null point
Returns:
Dictionary with period data in requested format
"""
periods_data = coordinator.data.get("periods", {})
period_data = periods_data.get(period_filter)
if not period_data:
return {data_key: []}
period_summaries = period_data.get("periods", [])
if not period_summaries:
return {data_key: []}
chart_data = []
# Filter periods by day if requested
filtered_periods = []
if days:
# Build set of allowed dates
allowed_dates = set()
for day in days:
# Map day names to actual dates from coordinator
price_info = coordinator.data.get("priceInfo", {})
day_prices = price_info.get(day, [])
if day_prices:
# Extract date from first interval
first_interval = day_prices[0]
starts_at = first_interval.get("startsAt")
if starts_at:
dt = dt_util.parse_datetime(starts_at)
if dt:
dt = dt_util.as_local(dt)
allowed_dates.add(dt.date())
# Filter periods to those within allowed dates
for period in period_summaries:
start = period.get("start")
if start and start.date() in allowed_dates:
filtered_periods.append(period)
else:
filtered_periods = period_summaries
# Apply level and rating_level filters
for period in filtered_periods:
# Apply level filter (normalize to uppercase for comparison)
if level_filter and "level" in period and period["level"].upper() not in level_filter:
continue
# Apply rating_level filter (normalize to uppercase for comparison)
if (
rating_level_filter
and "rating_level" in period
and period["rating_level"].upper() not in rating_level_filter
):
continue
# Build data point based on output format
if output_format == "array_of_objects":
# Map period fields to custom field names
# Period has: start, end, level, rating_level, price_avg, price_min, price_max
data_point = {}
# Start time
data_point[start_time_field] = period["start"]
# End time
data_point[end_time_field] = period.get("end")
# Price (use price_avg from period, stored in minor units)
price_avg = period.get("price_avg", 0.0)
# Convert to major currency unless minor_currency=True
if not minor_currency:
price_avg = price_avg / 100
if round_decimals is not None:
price_avg = round(price_avg, round_decimals)
data_point[price_field] = price_avg
# Level (only if requested and present)
if include_level and "level" in period:
data_point[level_field] = period["level"].upper()
# Rating level (only if requested and present)
if include_rating_level and "rating_level" in period:
data_point[rating_level_field] = period["rating_level"].upper()
chart_data.append(data_point)
else: # array_of_arrays
# For array_of_arrays, include: [start, price_avg]
price_avg = period.get("price_avg", 0.0)
# Convert to major currency unless minor_currency=True
if not minor_currency:
price_avg = price_avg / 100
if round_decimals is not None:
price_avg = round(price_avg, round_decimals)
chart_data.append([period["start"], price_avg])
# Add trailing null point if requested
if add_trailing_null and chart_data:
if output_format == "array_of_objects":
null_point = {start_time_field: None, end_time_field: None}
for field in [price_field, level_field, rating_level_field]:
null_point[field] = None
chart_data.append(null_point)
else: # array_of_arrays
chart_data.append([None, None])
return {data_key: chart_data}
async def _get_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, PLR0915, C901
"""Return price data in a simple chart-friendly format similar to Tibber Core integration."""
hass = call.hass
entry_id_raw = call.data.get(ATTR_ENTRY_ID)
if entry_id_raw is None:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry_id: str = str(entry_id_raw)
days_raw = call.data.get(ATTR_DAY)
# If no day specified, return all available data (today + tomorrow)
if days_raw is None:
days = ["today", "tomorrow"]
# Convert single string to list for uniform processing
elif isinstance(days_raw, str):
days = [days_raw]
else:
days = days_raw
start_time_field = call.data.get("start_time_field", "start_time")
end_time_field = call.data.get("end_time_field", "end_time")
price_field = call.data.get("price_field", "price_per_kwh")
level_field = call.data.get("level_field", "level")
rating_level_field = call.data.get("rating_level_field", "rating_level")
average_field = call.data.get("average_field", "average")
data_key = call.data.get("data_key", "data")
resolution = call.data.get("resolution", "interval")
output_format = call.data.get("output_format", "array_of_objects")
minor_currency = call.data.get("minor_currency", False)
round_decimals = call.data.get("round_decimals")
include_level = call.data.get("include_level", False)
include_rating_level = call.data.get("include_rating_level", False)
include_average = call.data.get("include_average", False)
insert_nulls = call.data.get("insert_nulls", "none")
add_trailing_null = call.data.get("add_trailing_null", False)
period_filter = call.data.get("period_filter")
# Filter values are already normalized to uppercase by schema validators
level_filter = call.data.get("level_filter")
rating_level_filter = call.data.get("rating_level_filter")
# If array_fields is specified, implicitly enable fields that are used
array_fields_template = call.data.get("array_fields")
if array_fields_template and output_format == "array_of_arrays":
if level_field in array_fields_template:
include_level = True
if rating_level_field in array_fields_template:
include_rating_level = True
if average_field in array_fields_template:
include_average = True
_, coordinator, _ = _get_entry_and_data(hass, entry_id)
# Get thresholds from config for rating aggregation
threshold_low = coordinator.config_entry.options.get(
CONF_PRICE_RATING_THRESHOLD_LOW, DEFAULT_PRICE_RATING_THRESHOLD_LOW
)
threshold_high = coordinator.config_entry.options.get(
CONF_PRICE_RATING_THRESHOLD_HIGH, DEFAULT_PRICE_RATING_THRESHOLD_HIGH
)
# === SPECIAL HANDLING: Period Filter ===
# When period_filter is set, return period summaries instead of interval data
# Period summaries are already complete objects with aggregated data
if period_filter:
return _get_period_data(
coordinator=coordinator,
period_filter=period_filter,
days=days,
output_format=output_format,
minor_currency=minor_currency,
round_decimals=round_decimals,
level_filter=level_filter,
rating_level_filter=rating_level_filter,
include_level=include_level,
include_rating_level=include_rating_level,
start_time_field=start_time_field,
end_time_field=end_time_field,
price_field=price_field,
level_field=level_field,
rating_level_field=rating_level_field,
data_key=data_key,
add_trailing_null=add_trailing_null,
)
# === NORMAL HANDLING: Interval Data ===
# Get price data for all requested days
price_info = coordinator.data.get("priceInfo", {})
chart_data = []
# Build set of timestamps that match period_filter if specified
period_timestamps = None
if period_filter:
period_timestamps = set()
periods_data = coordinator.data.get("periods", {})
period_data = periods_data.get(period_filter)
if period_data:
period_summaries = period_data.get("periods", [])
# Period summaries don't contain intervals, only start/end timestamps
# Build set of all 15-minute intervals within period ranges
for period_summary in period_summaries:
start = period_summary.get("start")
end = period_summary.get("end")
if start and end:
# Generate all 15-minute timestamps within this period
current = start
while current < end:
period_timestamps.add(current.isoformat())
current = current + timedelta(minutes=15)
# Collect all timestamps if insert_nulls='all' (needed to insert NULLs for missing filter matches)
all_timestamps = set()
if insert_nulls == "all" and (level_filter or rating_level_filter):
for day in days:
day_prices = price_info.get(day, [])
for interval in day_prices:
start_time = interval.get("startsAt")
if start_time:
all_timestamps.add(start_time)
all_timestamps = sorted(all_timestamps)
# Calculate average if requested
day_averages = {}
if include_average:
for day in days:
day_prices = price_info.get(day, [])
if day_prices:
prices = [p["total"] for p in day_prices if p.get("total") is not None]
if prices:
avg = sum(prices) / len(prices)
# Apply same transformations as to regular prices
avg = round(avg * 100, 2) if minor_currency else round(avg, 4)
if round_decimals is not None:
avg = round(avg, round_decimals)
day_averages[day] = avg
for day in days:
day_prices = price_info.get(day, [])
if resolution == "interval":
# Original 15-minute intervals
if insert_nulls == "all" and (level_filter or rating_level_filter):
# Mode 'all': Insert NULL for all timestamps where filter doesn't match
# Build a map of timestamp -> interval for quick lookup
interval_map = {
interval.get("startsAt"): interval for interval in day_prices if interval.get("startsAt")
}
# Process all timestamps, filling gaps with NULL
for start_time in all_timestamps:
interval = interval_map.get(start_time)
if interval is None:
# No data for this timestamp - skip entirely
continue
price = interval.get("total")
if price is None:
continue
# Check if this interval matches the filter
matches_filter = False
if level_filter and "level" in interval:
matches_filter = interval["level"] in level_filter
elif rating_level_filter and "rating_level" in interval:
matches_filter = interval["rating_level"] in rating_level_filter
# If filter is set but doesn't match, insert NULL price
if not matches_filter:
price = None
elif price is not None:
# Convert to minor currency (cents/øre) if requested
price = round(price * 100, 2) if minor_currency else round(price, 4)
# Apply custom rounding if specified
if round_decimals is not None:
price = round(price, round_decimals)
data_point = {start_time_field: start_time, price_field: price}
# Add level if requested (only when price is not NULL)
if include_level and "level" in interval and price is not None:
data_point[level_field] = interval["level"]
# Add rating_level if requested (only when price is not NULL)
if include_rating_level and "rating_level" in interval and price is not None:
data_point[rating_level_field] = interval["rating_level"]
# Add average if requested
if include_average and day in day_averages:
data_point[average_field] = day_averages[day]
chart_data.append(data_point)
elif insert_nulls == "segments" and (level_filter or rating_level_filter):
# Mode 'segments': Add NULL points at segment boundaries for clean gaps
# Determine which field to check based on filter type
filter_field = "rating_level" if rating_level_filter else "level"
filter_values = rating_level_filter if rating_level_filter else level_filter
for i in range(len(day_prices) - 1):
interval = day_prices[i]
next_interval = day_prices[i + 1]
start_time = interval.get("startsAt")
price = interval.get("total")
next_start_time = next_interval.get("startsAt")
if start_time is None or price is None:
continue
interval_value = interval.get(filter_field)
next_value = next_interval.get(filter_field)
# Check if current interval matches filter
if interval_value in filter_values:
# Convert price
converted_price = round(price * 100, 2) if minor_currency else round(price, 4)
if round_decimals is not None:
converted_price = round(converted_price, round_decimals)
# Add current point
data_point = {start_time_field: start_time, price_field: converted_price}
if include_level and "level" in interval:
data_point[level_field] = interval["level"]
if include_rating_level and "rating_level" in interval:
data_point[rating_level_field] = interval["rating_level"]
if include_average and day in day_averages:
data_point[average_field] = day_averages[day]
chart_data.append(data_point)
# Check if next interval is different level (segment boundary)
if next_value != interval_value:
# Hold current price until next timestamp (stepline effect)
hold_point = {start_time_field: next_start_time, price_field: converted_price}
if include_level and "level" in interval:
hold_point[level_field] = interval["level"]
if include_rating_level and "rating_level" in interval:
hold_point[rating_level_field] = interval["rating_level"]
if include_average and day in day_averages:
hold_point[average_field] = day_averages[day]
chart_data.append(hold_point)
# Add NULL point to create gap
null_point = {start_time_field: next_start_time, price_field: None}
chart_data.append(null_point)
# Handle last interval of the day - extend to midnight
if day_prices:
last_interval = day_prices[-1]
last_start_time = last_interval.get("startsAt")
last_price = last_interval.get("total")
last_value = last_interval.get(filter_field)
if last_start_time and last_price is not None and last_value in filter_values:
# Parse timestamp and calculate midnight of next day
last_dt = dt_util.parse_datetime(last_start_time)
if last_dt:
last_dt = dt_util.as_local(last_dt)
# Calculate next day at 00:00
next_day = last_dt.replace(hour=0, minute=0, second=0, microsecond=0)
next_day = next_day + timedelta(days=1)
midnight_timestamp = next_day.isoformat()
# Try to get real price from tomorrow's first interval
next_day_name = None
if day == "yesterday":
next_day_name = "today"
elif day == "today":
next_day_name = "tomorrow"
# For "tomorrow", we don't have a "day after tomorrow"
midnight_price = None
midnight_interval = None
if next_day_name:
next_day_prices = price_info.get(next_day_name, [])
if next_day_prices:
first_next = next_day_prices[0]
first_next_value = first_next.get(filter_field)
# Only use tomorrow's price if it matches the same filter
if first_next_value == last_value:
midnight_price = first_next.get("total")
midnight_interval = first_next
# Fallback: use last interval's price if no tomorrow data or different level
if midnight_price is None:
midnight_price = last_price
midnight_interval = last_interval
# Convert price
converted_price = (
round(midnight_price * 100, 2) if minor_currency else round(midnight_price, 4)
)
if round_decimals is not None:
converted_price = round(converted_price, round_decimals)
# Add point at midnight with appropriate price (extends graph to end of day)
end_point = {start_time_field: midnight_timestamp, price_field: converted_price}
if midnight_interval is not None:
if include_level and "level" in midnight_interval:
end_point[level_field] = midnight_interval["level"]
if include_rating_level and "rating_level" in midnight_interval:
end_point[rating_level_field] = midnight_interval["rating_level"]
if include_average and day in day_averages:
end_point[average_field] = day_averages[day]
chart_data.append(end_point)
else:
# Mode 'none' (default): Only return matching intervals, no NULL insertion
for interval in day_prices:
start_time = interval.get("startsAt")
price = interval.get("total")
if start_time is not None and price is not None:
# Apply period filter if specified
if (
period_filter is not None
and period_timestamps is not None
and start_time not in period_timestamps
):
continue
# Apply level filter if specified
if level_filter is not None and "level" in interval and interval["level"] not in level_filter:
continue
# Apply rating_level filter if specified
if (
rating_level_filter is not None
and "rating_level" in interval
and interval["rating_level"] not in rating_level_filter
):
continue
# Convert to minor currency (cents/øre) if requested
price = round(price * 100, 2) if minor_currency else round(price, 4)
# Apply custom rounding if specified
if round_decimals is not None:
price = round(price, round_decimals)
data_point = {start_time_field: start_time, price_field: price}
# Add level if requested
if include_level and "level" in interval:
data_point[level_field] = interval["level"]
# Add rating_level if requested
if include_rating_level and "rating_level" in interval:
data_point[rating_level_field] = interval["rating_level"]
# Add average if requested
if include_average and day in day_averages:
data_point[average_field] = day_averages[day]
chart_data.append(data_point)
elif resolution == "hourly":
# Hourly averages (4 intervals per hour: :00, :15, :30, :45)
chart_data.extend(
_aggregate_hourly_exact(
day_prices,
start_time_field,
price_field,
use_minor_currency=minor_currency,
round_decimals=round_decimals,
include_level=include_level,
include_rating_level=include_rating_level,
level_filter=level_filter,
rating_level_filter=rating_level_filter,
include_average=include_average,
level_field=level_field,
rating_level_field=rating_level_field,
average_field=average_field,
day_average=day_averages.get(day),
threshold_low=threshold_low,
period_timestamps=period_timestamps,
threshold_high=threshold_high,
)
)
# Convert to array of arrays format if requested
if output_format == "array_of_arrays":
array_fields_template = call.data.get("array_fields")
# Default: nur timestamp und price
if not array_fields_template:
array_fields_template = f"{{{start_time_field}}}, {{{price_field}}}"
# Parse template to extract field names
field_pattern = re.compile(r"\{([^}]+)\}")
field_names = field_pattern.findall(array_fields_template)
if not field_names:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_array_fields",
translation_placeholders={"template": array_fields_template},
)
# Convert to [[field1, field2, ...], ...] format
points = []
for item in chart_data:
row = []
for field_name in field_names:
# Get value from item, or None if field doesn't exist
value = item.get(field_name)
row.append(value)
points.append(row)
# Add final null point for stepline rendering if requested
# (some chart libraries need this to prevent extrapolation to viewport edge)
if add_trailing_null and points:
null_row = [points[-1][0]] + [None] * (len(field_names) - 1)
points.append(null_row)
return {data_key: points}
# Add trailing null point for array_of_objects format if requested
if add_trailing_null and chart_data:
# Create a null point with only timestamp from last item, all other fields as None
last_item = chart_data[-1]
null_point = {start_time_field: last_item.get(start_time_field)}
# Set all other potential fields to None
for field in [price_field, level_field, rating_level_field, average_field]:
if field in last_item:
null_point[field] = None
chart_data.append(null_point)
return {data_key: chart_data}
def _get_level_translation(level_key: str, level_type: str, language: str) -> str:
"""Get translated name for a price level or rating level."""
level_key_lower = level_key.lower()
# Use correct translation key based on level_type
if level_type == "rating_level":
name = get_translation(["selector", "rating_level_filter", "options", level_key_lower], language)
else:
name = get_translation(["selector", "level_filter", "options", level_key_lower], language)
# Fallback to original key if translation not found
return name or level_key
async def _get_apexcharts_yaml(call: ServiceCall) -> dict[str, Any]:
"""Return a YAML snippet for an ApexCharts card using the get_apexcharts_data service for each level."""
hass = call.hass
entry_id_raw = call.data.get(ATTR_ENTRY_ID)
if entry_id_raw is None:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry_id: str = str(entry_id_raw)
day = call.data.get("day", "today")
level_type = call.data.get("level_type", "rating_level")
# Get user's language from hass config
user_language = hass.config.language or "en"
# Get coordinator to access price data (for currency)
_, coordinator, _ = _get_entry_and_data(hass, entry_id)
price_info = coordinator.data.get("priceInfo", {})
currency = price_info.get("currency", "EUR")
price_unit = format_price_unit_minor(currency)
# Get a sample entity_id for the series (first sensor from this entry)
entity_registry = async_get_entity_registry(hass)
sample_entity = None
for entity in entity_registry.entities.values():
if entity.config_entry_id == entry_id and entity.domain == "sensor":
sample_entity = entity.entity_id
break
if level_type == "rating_level":
series_levels = [
(PRICE_RATING_LOW, "#2ecc71"),
(PRICE_RATING_NORMAL, "#f1c40f"),
(PRICE_RATING_HIGH, "#e74c3c"),
]
else:
series_levels = [
(PRICE_LEVEL_VERY_CHEAP, "#2ecc71"),
(PRICE_LEVEL_CHEAP, "#27ae60"),
(PRICE_LEVEL_NORMAL, "#f1c40f"),
(PRICE_LEVEL_EXPENSIVE, "#e67e22"),
(PRICE_LEVEL_VERY_EXPENSIVE, "#e74c3c"),
]
series = []
for level_key, color in series_levels:
# Get translated name for the level using helper function
name = _get_level_translation(level_key, level_type, user_language)
# Use server-side insert_nulls='segments' for clean gaps
if level_type == "rating_level":
filter_param = f"rating_level_filter: ['{level_key}']"
else:
filter_param = f"level_filter: ['{level_key}']"
data_generator = (
f"const response = await hass.callWS({{ "
f"type: 'call_service', "
f"domain: 'tibber_prices', "
f"service: 'get_chartdata', "
f"return_response: true, "
f"service_data: {{ entry_id: '{entry_id}', day: ['{day}'], {filter_param}, "
f"output_format: 'array_of_arrays', insert_nulls: 'segments', minor_currency: true }} }}); "
f"return response.response.data;"
)
# Only show extremas for HIGH and LOW levels (not NORMAL)
show_extremas = level_key != "NORMAL"
series.append(
{
"entity": sample_entity or "sensor.tibber_prices",
"name": name,
"type": "area",
"color": color,
"yaxis_id": "price",
"show": {"extremas": show_extremas, "legend_value": False},
"data_generator": data_generator,
"stroke_width": 1,
}
)
# Get translated title based on level_type
title_key = "title_rating_level" if level_type == "rating_level" else "title_level"
title = get_translation(["apexcharts", title_key], user_language) or (
"Price Phases Daily Progress" if level_type == "rating_level" else "Price Level"
)
# Add translated day to title
day_translated = get_translation(["selector", "day", "options", day], user_language) or day.capitalize()
title = f"{title} - {day_translated}"
# Configure span based on selected day
if day == "yesterday":
span_config = {"start": "day", "offset": "-1d"}
elif day == "tomorrow":
span_config = {"start": "day", "offset": "+1d"}
else: # today
span_config = {"start": "day"}
return {
"type": "custom:apexcharts-card",
"update_interval": "5m",
"span": span_config,
"header": {
"show": True,
"title": title,
"show_states": False,
},
"apex_config": {
"chart": {
"animations": {"enabled": False},
"toolbar": {"show": True, "tools": {"zoom": True, "pan": True}},
"zoom": {"enabled": True},
},
"stroke": {"curve": "stepline", "width": 2},
"fill": {
"type": "gradient",
"opacity": 0.4,
"gradient": {
"shade": "dark",
"type": "vertical",
"shadeIntensity": 0.5,
"opacityFrom": 0.7,
"opacityTo": 0.2,
},
},
"dataLabels": {"enabled": False},
"tooltip": {
"x": {"format": "HH:mm"},
"y": {"title": {"formatter": f"function() {{ return '{price_unit}'; }}"}},
},
"legend": {
"show": True,
"position": "top",
"horizontalAlign": "left",
"markers": {"radius": 2},
},
"grid": {
"show": True,
"borderColor": "#40475D",
"strokeDashArray": 4,
"xaxis": {"lines": {"show": True}},
"yaxis": {"lines": {"show": True}},
},
"markers": {"size": 0},
},
"yaxis": [
{
"id": "price",
"decimals": 2,
"min": 0,
"apex_config": {"title": {"text": price_unit}},
},
],
"now": {"show": True, "color": "#8e24aa", "label": "🕒 LIVE"},
"all_series_config": {
"stroke_width": 1,
"group_by": {"func": "raw", "duration": "15min"},
},
"series": series,
}
async def _refresh_user_data(call: ServiceCall) -> dict[str, Any]:
"""Refresh user data for a specific config entry and return updated information."""
entry_id = call.data.get(ATTR_ENTRY_ID)
hass = call.hass
if not entry_id:
return {
"success": False,
"message": "Entry ID is required",
}
# Get the entry and coordinator
try:
_, coordinator, _ = _get_entry_and_data(hass, entry_id)
except ServiceValidationError as ex:
return {
"success": False,
"message": f"Invalid entry ID: {ex}",
}
# Force refresh user data using the public method
try:
updated = await coordinator.refresh_user_data()
except (
TibberPricesApiClientAuthenticationError,
TibberPricesApiClientCommunicationError,
TibberPricesApiClientError,
) as ex:
return {
"success": False,
"message": f"API error refreshing user data: {ex!s}",
}
else:
if updated:
user_profile = coordinator.get_user_profile()
homes = coordinator.get_user_homes()
return {
"success": True,
"message": "User data refreshed successfully",
"user_profile": user_profile,
"homes_count": len(homes),
"homes": homes,
"last_updated": user_profile.get("last_updated"),
}
return {
"success": False,
"message": "User data was already up to date",
}
# --- Helpers ---
def _get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]:
"""Validate entry and extract coordinator and data."""
if not entry_id:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry = next(
(e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id),
None,
)
if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id")
coordinator = entry.runtime_data.coordinator
data = coordinator.data or {}
return entry, coordinator, data
# --- Service registration ---
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Set up services for Tibber Prices integration."""
hass.services.async_register(
DOMAIN,
APEXCHARTS_YAML_SERVICE_NAME,
_get_apexcharts_yaml,
schema=APEXCHARTS_SERVICE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
CHARTDATA_SERVICE_NAME,
_get_chartdata,
schema=CHARTDATA_SERVICE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
REFRESH_USER_DATA_SERVICE_NAME,
_refresh_user_data,
schema=REFRESH_USER_DATA_SERVICE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)