hass.tibber_prices/custom_components/tibber_prices/services.py
Julian Pawlowski fb70f29ac9 feat(services): rewrite ApexCharts service for modern workflow
Complete overhaul of the ApexCharts integration service layer to support
modern chart card workflows with flexible data formatting and filtering.

Replaced services:
- Removed: get_price, get_apexcharts_data (legacy, entity-based)
- Added: get_chartdata (flexible data service)
- Improved: get_apexcharts_yaml (now uses get_chartdata internally)

New get_chartdata service features:
- Multiple output formats (array_of_objects, array_of_arrays)
- Customizable field names for chart compatibility
- Resolution options (15-min intervals, hourly averages)
- Advanced filtering (level_filter, rating_level_filter)
- NULL insertion modes (none, segments, all) for clean gaps
- Minor currency support (cents/øre) with custom rounding
- Optional fields (level, rating_level, average)
- Multi-day support (yesterday/today/tomorrow)

Enhanced get_apexcharts_yaml service:
- Direct entry_id parameter (no entity_id lookup needed)
- Uses get_chartdata with WebSocket API (data_generator)
- Improved ApexCharts configuration:
  * Gradient fill (70% opacity → 20%)
  * Grid styling with dashed lines
  * Zoom & Pan tools (animations disabled for performance)
  * Optimized legend (top-left, compact markers)
  * Y-axis auto-scaling (min: 0 for visibility, supports negative prices)
  * 2 decimal places (improved precision)
  * Browser locale formatting (automatic comma/point)
  * insert_nulls='segments' for clean gaps between levels
- Multi-language support (translated titles, series names)
- Day selection (yesterday/today/tomorrow with correct span config)

Service translations:
- Added comprehensive field descriptions (all 5 languages: de, en, nb, nl, sv)
- Selector translations for all options (day, resolution, output_format, etc.)
- ApexCharts title translations in custom_translations/

Technical improvements:
- Hourly aggregation uses exact 4-interval windows (:00/:15/:30/:45)
- Level/rating aggregation follows sensor logic (aggregate_level_data, aggregate_rating_data)
- Midnight extension for last interval of filtered data (seamless day transitions)
- Case-insensitive filter matching (normalized to uppercase)
- Ruff complexity fixed (extracted _get_level_translation helper)

Impact: Users can now generate production-ready ApexCharts YAML with a single
service call, or use get_chartdata flexibly with any chart card (ApexCharts,
Plotly, Mini Graph, etc.). Supports complex filtering scenarios (e.g., "show
only LOW rating periods") with clean visual gaps. Full multi-language support.
2025-11-16 23:52:36 +00:00

872 lines
36 KiB
Python

"""Services for Tibber Prices integration."""
from __future__ import annotations
import re
from datetime import timedelta
from typing import Any, Final
import voluptuous as vol
from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse, callback
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry
from homeassistant.util import dt as dt_util
from .api import (
TibberPricesApiClientAuthenticationError,
TibberPricesApiClientCommunicationError,
TibberPricesApiClientError,
)
from .const import (
CONF_PRICE_RATING_THRESHOLD_HIGH,
CONF_PRICE_RATING_THRESHOLD_LOW,
DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
DEFAULT_PRICE_RATING_THRESHOLD_LOW,
DOMAIN,
PRICE_LEVEL_CHEAP,
PRICE_LEVEL_EXPENSIVE,
PRICE_LEVEL_NORMAL,
PRICE_LEVEL_VERY_CHEAP,
PRICE_LEVEL_VERY_EXPENSIVE,
PRICE_RATING_HIGH,
PRICE_RATING_LOW,
PRICE_RATING_NORMAL,
format_price_unit_minor,
get_translation,
)
from .sensor.helpers import aggregate_level_data, aggregate_rating_data
APEXCHARTS_YAML_SERVICE_NAME = "get_apexcharts_yaml"
CHARTDATA_SERVICE_NAME = "get_chartdata"
REFRESH_USER_DATA_SERVICE_NAME = "refresh_user_data"
ATTR_DAY: Final = "day"
ATTR_ENTRY_ID: Final = "entry_id"
APEXCHARTS_SERVICE_SCHEMA: Final = vol.Schema(
{
vol.Required(ATTR_ENTRY_ID): str,
vol.Optional("day", default="today"): vol.In(["yesterday", "today", "tomorrow"]),
vol.Optional("level_type", default="rating_level"): vol.In(["rating_level", "level"]),
}
)
def _normalize_level_filter(value: list[str] | None) -> list[str] | None:
"""Convert level filter values to uppercase for case-insensitive comparison."""
if value is None:
return None
return [v.upper() for v in value]
def _normalize_rating_level_filter(value: list[str] | None) -> list[str] | None:
"""Convert rating level filter values to uppercase for case-insensitive comparison."""
if value is None:
return None
return [v.upper() for v in value]
CHARTDATA_SERVICE_SCHEMA: Final = vol.Schema(
{
vol.Required(ATTR_ENTRY_ID): str,
vol.Optional(ATTR_DAY): vol.All(vol.Coerce(list), [vol.In(["yesterday", "today", "tomorrow"])]),
vol.Optional("resolution", default="interval"): vol.In(["interval", "hourly"]),
vol.Optional("output_format", default="array_of_objects"): vol.In(["array_of_objects", "array_of_arrays"]),
vol.Optional("array_fields"): str,
vol.Optional("minor_currency", default=False): bool,
vol.Optional("round_decimals"): vol.All(vol.Coerce(int), vol.Range(min=0, max=10)),
vol.Optional("include_level", default=False): bool,
vol.Optional("include_rating_level", default=False): bool,
vol.Optional("include_average", default=False): bool,
vol.Optional("level_filter"): vol.All(
vol.Coerce(list),
_normalize_level_filter,
[
vol.In(
[
PRICE_LEVEL_VERY_CHEAP,
PRICE_LEVEL_CHEAP,
PRICE_LEVEL_NORMAL,
PRICE_LEVEL_EXPENSIVE,
PRICE_LEVEL_VERY_EXPENSIVE,
]
)
],
),
vol.Optional("rating_level_filter"): vol.All(
vol.Coerce(list),
_normalize_rating_level_filter,
[vol.In([PRICE_RATING_LOW, PRICE_RATING_NORMAL, PRICE_RATING_HIGH])],
),
vol.Optional("insert_nulls", default="none"): vol.In(["none", "segments", "all"]),
vol.Optional("add_trailing_null", default=False): bool,
vol.Optional("timestamp_field", default="start_time"): str,
vol.Optional("price_field", default="price_per_kwh"): str,
vol.Optional("level_field", default="level"): str,
vol.Optional("rating_level_field", default="rating_level"): str,
vol.Optional("average_field", default="average"): str,
vol.Optional("data_key", default="data"): str,
}
)
REFRESH_USER_DATA_SERVICE_SCHEMA: Final = vol.Schema(
{
vol.Required(ATTR_ENTRY_ID): str,
}
)
# --- Entry point: Service handler ---
def _aggregate_hourly_exact( # noqa: PLR0913, PLR0912
intervals: list[dict],
timestamp_field: str,
price_field: str,
*,
use_minor_currency: bool = False,
round_decimals: int | None = None,
include_level: bool = False,
include_rating_level: bool = False,
level_filter: list[str] | None = None,
rating_level_filter: list[str] | None = None,
include_average: bool = False,
level_field: str = "level",
rating_level_field: str = "rating_level",
average_field: str = "average",
day_average: float | None = None,
threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW,
threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
) -> list[dict]:
"""
Aggregate 15-minute intervals to exact hourly averages.
Each hour uses exactly 4 intervals (00:00, 00:15, 00:30, 00:45).
Returns data points at the start of each hour.
"""
if not intervals:
return []
hourly_data = []
i = 0
while i < len(intervals):
interval = intervals[i]
start_time_str = interval.get("startsAt")
if not start_time_str:
i += 1
continue
# Parse the timestamp
start_time = dt_util.parse_datetime(start_time_str)
if not start_time:
i += 1
continue
# Check if this is the start of an hour (:00)
if start_time.minute != 0:
i += 1
continue
# Collect 4 intervals for this hour (with optional filtering)
hour_intervals = []
hour_interval_data = [] # Complete interval data for aggregation functions
for j in range(4):
if i + j < len(intervals):
interval = intervals[i + j]
# Apply level filter if specified
if level_filter is not None and "level" in interval and interval["level"] not in level_filter:
continue
# Apply rating_level filter if specified
if (
rating_level_filter is not None
and "rating_level" in interval
and interval["rating_level"] not in rating_level_filter
):
continue
price = interval.get("total")
if price is not None:
hour_intervals.append(price)
hour_interval_data.append(interval)
# Calculate average if we have data
if hour_intervals:
avg_price = sum(hour_intervals) / len(hour_intervals)
# Convert to minor currency (cents/øre) if requested
avg_price = round(avg_price * 100, 2) if use_minor_currency else round(avg_price, 4)
# Apply custom rounding if specified
if round_decimals is not None:
avg_price = round(avg_price, round_decimals)
data_point = {timestamp_field: start_time_str, price_field: avg_price}
# Add aggregated level using same logic as sensors
if include_level and hour_interval_data:
aggregated_level = aggregate_level_data(hour_interval_data)
if aggregated_level:
data_point[level_field] = aggregated_level.upper() # Convert back to uppercase
# Add aggregated rating_level using same logic as sensors
if include_rating_level and hour_interval_data:
aggregated_rating = aggregate_rating_data(hour_interval_data, threshold_low, threshold_high)
if aggregated_rating:
data_point[rating_level_field] = aggregated_rating.upper() # Convert back to uppercase
# Add average if requested
if include_average and day_average is not None:
data_point[average_field] = day_average
hourly_data.append(data_point)
# Move to next hour (skip 4 intervals)
i += 4
return hourly_data
async def _get_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, PLR0915, C901
"""Return price data in a simple chart-friendly format similar to Tibber Core integration."""
hass = call.hass
entry_id_raw = call.data.get(ATTR_ENTRY_ID)
if entry_id_raw is None:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry_id: str = str(entry_id_raw)
days_raw = call.data.get(ATTR_DAY)
# If no day specified, return all available data (today + tomorrow)
if days_raw is None:
days = ["today", "tomorrow"]
# Convert single string to list for uniform processing
elif isinstance(days_raw, str):
days = [days_raw]
else:
days = days_raw
timestamp_field = call.data.get("timestamp_field", "start_time")
price_field = call.data.get("price_field", "price_per_kwh")
level_field = call.data.get("level_field", "level")
rating_level_field = call.data.get("rating_level_field", "rating_level")
average_field = call.data.get("average_field", "average")
data_key = call.data.get("data_key", "data")
resolution = call.data.get("resolution", "interval")
output_format = call.data.get("output_format", "array_of_objects")
minor_currency = call.data.get("minor_currency", False)
round_decimals = call.data.get("round_decimals")
include_level = call.data.get("include_level", False)
include_rating_level = call.data.get("include_rating_level", False)
include_average = call.data.get("include_average", False)
insert_nulls = call.data.get("insert_nulls", "none")
add_trailing_null = call.data.get("add_trailing_null", False)
# Filter values are already normalized to uppercase by schema validators
level_filter = call.data.get("level_filter")
rating_level_filter = call.data.get("rating_level_filter")
# If array_fields is specified, implicitly enable fields that are used
array_fields_template = call.data.get("array_fields")
if array_fields_template and output_format == "array_of_arrays":
if level_field in array_fields_template:
include_level = True
if rating_level_field in array_fields_template:
include_rating_level = True
if average_field in array_fields_template:
include_average = True
_, coordinator, _ = _get_entry_and_data(hass, entry_id)
# Get thresholds from config for rating aggregation
threshold_low = coordinator.config_entry.options.get(
CONF_PRICE_RATING_THRESHOLD_LOW, DEFAULT_PRICE_RATING_THRESHOLD_LOW
)
threshold_high = coordinator.config_entry.options.get(
CONF_PRICE_RATING_THRESHOLD_HIGH, DEFAULT_PRICE_RATING_THRESHOLD_HIGH
)
# Get price data for all requested days
price_info = coordinator.data.get("priceInfo", {})
chart_data = []
# Collect all timestamps if insert_nulls='all' (needed to insert NULLs for missing filter matches)
all_timestamps = set()
if insert_nulls == "all" and (level_filter or rating_level_filter):
for day in days:
day_prices = price_info.get(day, [])
for interval in day_prices:
start_time = interval.get("startsAt")
if start_time:
all_timestamps.add(start_time)
all_timestamps = sorted(all_timestamps)
# Calculate average if requested
day_averages = {}
if include_average:
for day in days:
day_prices = price_info.get(day, [])
if day_prices:
prices = [p["total"] for p in day_prices if p.get("total") is not None]
if prices:
avg = sum(prices) / len(prices)
# Apply same transformations as to regular prices
avg = round(avg * 100, 2) if minor_currency else round(avg, 4)
if round_decimals is not None:
avg = round(avg, round_decimals)
day_averages[day] = avg
for day in days:
day_prices = price_info.get(day, [])
if resolution == "interval":
# Original 15-minute intervals
if insert_nulls == "all" and (level_filter or rating_level_filter):
# Mode 'all': Insert NULL for all timestamps where filter doesn't match
# Build a map of timestamp -> interval for quick lookup
interval_map = {
interval.get("startsAt"): interval for interval in day_prices if interval.get("startsAt")
}
# Process all timestamps, filling gaps with NULL
for start_time in all_timestamps:
interval = interval_map.get(start_time)
if interval is None:
# No data for this timestamp - skip entirely
continue
price = interval.get("total")
if price is None:
continue
# Check if this interval matches the filter
matches_filter = False
if level_filter and "level" in interval:
matches_filter = interval["level"] in level_filter
elif rating_level_filter and "rating_level" in interval:
matches_filter = interval["rating_level"] in rating_level_filter
# If filter is set but doesn't match, insert NULL price
if not matches_filter:
price = None
elif price is not None:
# Convert to minor currency (cents/øre) if requested
price = round(price * 100, 2) if minor_currency else round(price, 4)
# Apply custom rounding if specified
if round_decimals is not None:
price = round(price, round_decimals)
data_point = {timestamp_field: start_time, price_field: price}
# Add level if requested (only when price is not NULL)
if include_level and "level" in interval and price is not None:
data_point[level_field] = interval["level"]
# Add rating_level if requested (only when price is not NULL)
if include_rating_level and "rating_level" in interval and price is not None:
data_point[rating_level_field] = interval["rating_level"]
# Add average if requested
if include_average and day in day_averages:
data_point[average_field] = day_averages[day]
chart_data.append(data_point)
elif insert_nulls == "segments" and (level_filter or rating_level_filter):
# Mode 'segments': Add NULL points at segment boundaries for clean gaps
# Determine which field to check based on filter type
filter_field = "rating_level" if rating_level_filter else "level"
filter_values = rating_level_filter if rating_level_filter else level_filter
for i in range(len(day_prices) - 1):
interval = day_prices[i]
next_interval = day_prices[i + 1]
start_time = interval.get("startsAt")
price = interval.get("total")
next_start_time = next_interval.get("startsAt")
if start_time is None or price is None:
continue
interval_value = interval.get(filter_field)
next_value = next_interval.get(filter_field)
# Check if current interval matches filter
if interval_value in filter_values:
# Convert price
converted_price = round(price * 100, 2) if minor_currency else round(price, 4)
if round_decimals is not None:
converted_price = round(converted_price, round_decimals)
# Add current point
data_point = {timestamp_field: start_time, price_field: converted_price}
if include_level and "level" in interval:
data_point[level_field] = interval["level"]
if include_rating_level and "rating_level" in interval:
data_point[rating_level_field] = interval["rating_level"]
if include_average and day in day_averages:
data_point[average_field] = day_averages[day]
chart_data.append(data_point)
# Check if next interval is different level (segment boundary)
if next_value != interval_value:
# Hold current price until next timestamp (stepline effect)
hold_point = {timestamp_field: next_start_time, price_field: converted_price}
if include_level and "level" in interval:
hold_point[level_field] = interval["level"]
if include_rating_level and "rating_level" in interval:
hold_point[rating_level_field] = interval["rating_level"]
if include_average and day in day_averages:
hold_point[average_field] = day_averages[day]
chart_data.append(hold_point)
# Add NULL point to create gap
null_point = {timestamp_field: next_start_time, price_field: None}
chart_data.append(null_point)
# Handle last interval of the day - extend to midnight
if day_prices:
last_interval = day_prices[-1]
last_start_time = last_interval.get("startsAt")
last_price = last_interval.get("total")
last_value = last_interval.get(filter_field)
if last_start_time and last_price is not None and last_value in filter_values:
# Parse timestamp and calculate midnight of next day
last_dt = dt_util.parse_datetime(last_start_time)
if last_dt:
last_dt = dt_util.as_local(last_dt)
# Calculate next day at 00:00
next_day = last_dt.replace(hour=0, minute=0, second=0, microsecond=0)
next_day = next_day + timedelta(days=1)
midnight_timestamp = next_day.isoformat()
# Try to get real price from tomorrow's first interval
next_day_name = None
if day == "yesterday":
next_day_name = "today"
elif day == "today":
next_day_name = "tomorrow"
# For "tomorrow", we don't have a "day after tomorrow"
midnight_price = None
midnight_interval = None
if next_day_name:
next_day_prices = price_info.get(next_day_name, [])
if next_day_prices:
first_next = next_day_prices[0]
first_next_value = first_next.get(filter_field)
# Only use tomorrow's price if it matches the same filter
if first_next_value == last_value:
midnight_price = first_next.get("total")
midnight_interval = first_next
# Fallback: use last interval's price if no tomorrow data or different level
if midnight_price is None:
midnight_price = last_price
midnight_interval = last_interval
# Convert price
converted_price = (
round(midnight_price * 100, 2) if minor_currency else round(midnight_price, 4)
)
if round_decimals is not None:
converted_price = round(converted_price, round_decimals)
# Add point at midnight with appropriate price (extends graph to end of day)
end_point = {timestamp_field: midnight_timestamp, price_field: converted_price}
if midnight_interval is not None:
if include_level and "level" in midnight_interval:
end_point[level_field] = midnight_interval["level"]
if include_rating_level and "rating_level" in midnight_interval:
end_point[rating_level_field] = midnight_interval["rating_level"]
if include_average and day in day_averages:
end_point[average_field] = day_averages[day]
chart_data.append(end_point)
else:
# Mode 'none' (default): Only return matching intervals, no NULL insertion
for interval in day_prices:
start_time = interval.get("startsAt")
price = interval.get("total")
if start_time is not None and price is not None:
# Apply level filter if specified
if level_filter is not None and "level" in interval and interval["level"] not in level_filter:
continue
# Apply rating_level filter if specified
if (
rating_level_filter is not None
and "rating_level" in interval
and interval["rating_level"] not in rating_level_filter
):
continue
# Convert to minor currency (cents/øre) if requested
price = round(price * 100, 2) if minor_currency else round(price, 4)
# Apply custom rounding if specified
if round_decimals is not None:
price = round(price, round_decimals)
data_point = {timestamp_field: start_time, price_field: price}
# Add level if requested
if include_level and "level" in interval:
data_point[level_field] = interval["level"]
# Add rating_level if requested
if include_rating_level and "rating_level" in interval:
data_point[rating_level_field] = interval["rating_level"]
# Add average if requested
if include_average and day in day_averages:
data_point[average_field] = day_averages[day]
chart_data.append(data_point)
elif resolution == "hourly":
# Hourly averages (4 intervals per hour: :00, :15, :30, :45)
chart_data.extend(
_aggregate_hourly_exact(
day_prices,
timestamp_field,
price_field,
use_minor_currency=minor_currency,
round_decimals=round_decimals,
include_level=include_level,
include_rating_level=include_rating_level,
level_filter=level_filter,
rating_level_filter=rating_level_filter,
include_average=include_average,
level_field=level_field,
rating_level_field=rating_level_field,
average_field=average_field,
day_average=day_averages.get(day),
threshold_low=threshold_low,
threshold_high=threshold_high,
)
)
# Convert to array of arrays format if requested
if output_format == "array_of_arrays":
array_fields_template = call.data.get("array_fields")
# Default: nur timestamp und price
if not array_fields_template:
array_fields_template = f"{{{timestamp_field}}}, {{{price_field}}}"
# Parse template to extract field names
field_pattern = re.compile(r"\{([^}]+)\}")
field_names = field_pattern.findall(array_fields_template)
if not field_names:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_array_fields",
translation_placeholders={"template": array_fields_template},
)
# Convert to [[field1, field2, ...], ...] format
points = []
for item in chart_data:
row = []
for field_name in field_names:
# Get value from item, or None if field doesn't exist
value = item.get(field_name)
row.append(value)
points.append(row)
# Add final null point for stepline rendering if requested
# (some chart libraries need this to prevent extrapolation to viewport edge)
if add_trailing_null and points:
null_row = [points[-1][0]] + [None] * (len(field_names) - 1)
points.append(null_row)
return {data_key: points}
# Add trailing null point for array_of_objects format if requested
if add_trailing_null and chart_data:
# Create a null point with only timestamp from last item, all other fields as None
last_item = chart_data[-1]
null_point = {timestamp_field: last_item.get(timestamp_field)}
# Set all other potential fields to None
for field in [price_field, level_field, rating_level_field, average_field]:
if field in last_item:
null_point[field] = None
chart_data.append(null_point)
return {data_key: chart_data}
def _get_level_translation(level_key: str, level_type: str, language: str) -> str:
"""Get translated name for a price level or rating level."""
level_key_lower = level_key.lower()
# Use correct translation key based on level_type
if level_type == "rating_level":
name = get_translation(["selector", "rating_level_filter", "options", level_key_lower], language)
else:
name = get_translation(["selector", "level_filter", "options", level_key_lower], language)
# Fallback to original key if translation not found
return name or level_key
async def _get_apexcharts_yaml(call: ServiceCall) -> dict[str, Any]:
"""Return a YAML snippet for an ApexCharts card using the get_apexcharts_data service for each level."""
hass = call.hass
entry_id_raw = call.data.get(ATTR_ENTRY_ID)
if entry_id_raw is None:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry_id: str = str(entry_id_raw)
day = call.data.get("day", "today")
level_type = call.data.get("level_type", "rating_level")
# Get user's language from hass config
user_language = hass.config.language or "en"
# Get coordinator to access price data (for currency)
_, coordinator, _ = _get_entry_and_data(hass, entry_id)
price_info = coordinator.data.get("priceInfo", {})
currency = price_info.get("currency", "EUR")
price_unit = format_price_unit_minor(currency)
# Get a sample entity_id for the series (first sensor from this entry)
entity_registry = async_get_entity_registry(hass)
sample_entity = None
for entity in entity_registry.entities.values():
if entity.config_entry_id == entry_id and entity.domain == "sensor":
sample_entity = entity.entity_id
break
if level_type == "rating_level":
series_levels = [
(PRICE_RATING_LOW, "#2ecc71"),
(PRICE_RATING_NORMAL, "#f1c40f"),
(PRICE_RATING_HIGH, "#e74c3c"),
]
else:
series_levels = [
(PRICE_LEVEL_VERY_CHEAP, "#2ecc71"),
(PRICE_LEVEL_CHEAP, "#27ae60"),
(PRICE_LEVEL_NORMAL, "#f1c40f"),
(PRICE_LEVEL_EXPENSIVE, "#e67e22"),
(PRICE_LEVEL_VERY_EXPENSIVE, "#e74c3c"),
]
series = []
for level_key, color in series_levels:
# Get translated name for the level using helper function
name = _get_level_translation(level_key, level_type, user_language)
# Use server-side insert_nulls='segments' for clean gaps
if level_type == "rating_level":
filter_param = f"rating_level_filter: ['{level_key}']"
else:
filter_param = f"level_filter: ['{level_key}']"
data_generator = (
f"const response = await hass.callWS({{ "
f"type: 'call_service', "
f"domain: 'tibber_prices', "
f"service: 'get_chartdata', "
f"return_response: true, "
f"service_data: {{ entry_id: '{entry_id}', day: ['{day}'], {filter_param}, "
f"output_format: 'array_of_arrays', insert_nulls: 'segments', minor_currency: true }} }}); "
f"return response.response.data;"
)
# Only show extremas for HIGH and LOW levels (not NORMAL)
show_extremas = level_key != "NORMAL"
series.append(
{
"entity": sample_entity or "sensor.tibber_prices",
"name": name,
"type": "area",
"color": color,
"yaxis_id": "price",
"show": {"extremas": show_extremas, "legend_value": False},
"data_generator": data_generator,
"stroke_width": 1,
}
)
# Get translated title based on level_type
title_key = "title_rating_level" if level_type == "rating_level" else "title_level"
title = get_translation(["apexcharts", title_key], user_language) or (
"Price Phases Daily Progress" if level_type == "rating_level" else "Price Level"
)
# Add translated day to title
day_translated = get_translation(["selector", "day", "options", day], user_language) or day.capitalize()
title = f"{title} - {day_translated}"
# Configure span based on selected day
if day == "yesterday":
span_config = {"start": "day", "offset": "-1d"}
elif day == "tomorrow":
span_config = {"start": "day", "offset": "+1d"}
else: # today
span_config = {"start": "day"}
return {
"type": "custom:apexcharts-card",
"update_interval": "5m",
"span": span_config,
"header": {
"show": True,
"title": title,
"show_states": False,
},
"apex_config": {
"chart": {
"animations": {"enabled": False},
"toolbar": {"show": True, "tools": {"zoom": True, "pan": True}},
"zoom": {"enabled": True},
},
"stroke": {"curve": "stepline", "width": 2},
"fill": {
"type": "gradient",
"opacity": 0.4,
"gradient": {
"shade": "dark",
"type": "vertical",
"shadeIntensity": 0.5,
"opacityFrom": 0.7,
"opacityTo": 0.2,
},
},
"dataLabels": {"enabled": False},
"tooltip": {
"x": {"format": "HH:mm"},
"y": {"title": {"formatter": f"function() {{ return '{price_unit}'; }}"}},
},
"legend": {
"show": True,
"position": "top",
"horizontalAlign": "left",
"markers": {"radius": 2},
},
"grid": {
"show": True,
"borderColor": "#40475D",
"strokeDashArray": 4,
"xaxis": {"lines": {"show": True}},
"yaxis": {"lines": {"show": True}},
},
"markers": {"size": 0},
},
"yaxis": [
{
"id": "price",
"decimals": 2,
"min": 0,
"apex_config": {"title": {"text": price_unit}},
},
],
"now": {"show": True, "color": "#8e24aa", "label": "🕒 LIVE"},
"all_series_config": {
"stroke_width": 1,
"group_by": {"func": "raw", "duration": "15min"},
},
"series": series,
}
async def _refresh_user_data(call: ServiceCall) -> dict[str, Any]:
"""Refresh user data for a specific config entry and return updated information."""
entry_id = call.data.get(ATTR_ENTRY_ID)
hass = call.hass
if not entry_id:
return {
"success": False,
"message": "Entry ID is required",
}
# Get the entry and coordinator
try:
_, coordinator, _ = _get_entry_and_data(hass, entry_id)
except ServiceValidationError as ex:
return {
"success": False,
"message": f"Invalid entry ID: {ex}",
}
# Force refresh user data using the public method
try:
updated = await coordinator.refresh_user_data()
except (
TibberPricesApiClientAuthenticationError,
TibberPricesApiClientCommunicationError,
TibberPricesApiClientError,
) as ex:
return {
"success": False,
"message": f"API error refreshing user data: {ex!s}",
}
else:
if updated:
user_profile = coordinator.get_user_profile()
homes = coordinator.get_user_homes()
return {
"success": True,
"message": "User data refreshed successfully",
"user_profile": user_profile,
"homes_count": len(homes),
"homes": homes,
"last_updated": user_profile.get("last_updated"),
}
return {
"success": False,
"message": "User data was already up to date",
}
# --- Helpers ---
def _get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]:
"""Validate entry and extract coordinator and data."""
if not entry_id:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id")
entry = next(
(e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id),
None,
)
if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data:
raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id")
coordinator = entry.runtime_data.coordinator
data = coordinator.data or {}
return entry, coordinator, data
# --- Service registration ---
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Set up services for Tibber Prices integration."""
hass.services.async_register(
DOMAIN,
APEXCHARTS_YAML_SERVICE_NAME,
_get_apexcharts_yaml,
schema=APEXCHARTS_SERVICE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
CHARTDATA_SERVICE_NAME,
_get_chartdata,
schema=CHARTDATA_SERVICE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
REFRESH_USER_DATA_SERVICE_NAME,
_refresh_user_data,
schema=REFRESH_USER_DATA_SERVICE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)