diff --git a/custom_components/tibber_prices/services/__init__.py b/custom_components/tibber_prices/services/__init__.py new file mode 100644 index 0000000..9e8ca37 --- /dev/null +++ b/custom_components/tibber_prices/services/__init__.py @@ -0,0 +1,68 @@ +""" +Service handlers for Tibber Prices integration. + +This package provides service endpoints for external integrations and data export: +- Chart data export (get_chartdata) +- ApexCharts YAML generation (get_apexcharts_yaml) +- User data refresh (refresh_user_data) + +Architecture: +- helpers.py: Common utilities (get_entry_and_data) +- formatters.py: Data transformation and formatting functions +- chartdata.py: Main data export service handler +- apexcharts.py: ApexCharts card YAML generator +- refresh_user_data.py: User data refresh handler + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from custom_components.tibber_prices.const import DOMAIN +from homeassistant.core import SupportsResponse, callback + +from .apexcharts import ( + APEXCHARTS_SERVICE_SCHEMA, + APEXCHARTS_YAML_SERVICE_NAME, + handle_apexcharts_yaml, +) +from .chartdata import CHARTDATA_SERVICE_NAME, CHARTDATA_SERVICE_SCHEMA, handle_chartdata +from .refresh_user_data import ( + REFRESH_USER_DATA_SERVICE_NAME, + REFRESH_USER_DATA_SERVICE_SCHEMA, + handle_refresh_user_data, +) + +if TYPE_CHECKING: + from homeassistant.core import HomeAssistant + +__all__ = [ + "async_setup_services", +] + + +@callback +def async_setup_services(hass: HomeAssistant) -> None: + """Set up services for Tibber Prices integration.""" + hass.services.async_register( + DOMAIN, + APEXCHARTS_YAML_SERVICE_NAME, + handle_apexcharts_yaml, + schema=APEXCHARTS_SERVICE_SCHEMA, + supports_response=SupportsResponse.ONLY, + ) + hass.services.async_register( + DOMAIN, + CHARTDATA_SERVICE_NAME, + handle_chartdata, + schema=CHARTDATA_SERVICE_SCHEMA, + supports_response=SupportsResponse.ONLY, + ) + hass.services.async_register( + DOMAIN, + REFRESH_USER_DATA_SERVICE_NAME, + handle_refresh_user_data, + schema=REFRESH_USER_DATA_SERVICE_SCHEMA, + supports_response=SupportsResponse.ONLY, + ) diff --git a/custom_components/tibber_prices/services/apexcharts.py b/custom_components/tibber_prices/services/apexcharts.py new file mode 100644 index 0000000..e2c0ef3 --- /dev/null +++ b/custom_components/tibber_prices/services/apexcharts.py @@ -0,0 +1,238 @@ +""" +ApexCharts YAML generation service handler. + +This module implements the `get_apexcharts_yaml` service, which generates +ready-to-use YAML configuration for ApexCharts cards with price level visualization. + +Features: +- Automatic color-coded series per price level/rating +- Server-side NULL insertion for clean gaps +- Translated level names and titles +- Responsive to user language settings +- Configurable day selection (yesterday/today/tomorrow) + +Service: tibber_prices.get_apexcharts_yaml +Response: YAML configuration dict for ApexCharts card + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Final + +import voluptuous as vol + +from custom_components.tibber_prices.const import ( + DOMAIN, + PRICE_LEVEL_CHEAP, + PRICE_LEVEL_EXPENSIVE, + PRICE_LEVEL_NORMAL, + PRICE_LEVEL_VERY_CHEAP, + PRICE_LEVEL_VERY_EXPENSIVE, + PRICE_RATING_HIGH, + PRICE_RATING_LOW, + PRICE_RATING_NORMAL, + format_price_unit_minor, + get_translation, +) +from homeassistant.exceptions import ServiceValidationError +from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry + +from .formatters import get_level_translation +from .helpers import get_entry_and_data + +if TYPE_CHECKING: + from homeassistant.core import ServiceCall + +# Service constants +APEXCHARTS_YAML_SERVICE_NAME: Final = "get_apexcharts_yaml" +ATTR_DAY: Final = "day" +ATTR_ENTRY_ID: Final = "entry_id" + +# Service schema +APEXCHARTS_SERVICE_SCHEMA: Final = vol.Schema( + { + vol.Required(ATTR_ENTRY_ID): str, + vol.Optional("day", default="today"): vol.In(["yesterday", "today", "tomorrow"]), + vol.Optional("level_type", default="rating_level"): vol.In(["rating_level", "level"]), + } +) + + +async def handle_apexcharts_yaml(call: ServiceCall) -> dict[str, Any]: + """ + Return YAML snippet for ApexCharts card. + + Generates a complete ApexCharts card configuration with: + - Separate series for each price level/rating (color-coded) + - Automatic data fetching via get_chartdata service + - Translated labels and titles + - Clean gap visualization with NULL insertion + + See services.yaml for detailed parameter documentation. + + Args: + call: Service call with parameters + + Returns: + Dictionary with ApexCharts card configuration + + Raises: + ServiceValidationError: If entry_id is missing or invalid + + """ + hass = call.hass + entry_id_raw = call.data.get(ATTR_ENTRY_ID) + if entry_id_raw is None: + raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") + entry_id: str = str(entry_id_raw) + + day = call.data.get("day", "today") + level_type = call.data.get("level_type", "rating_level") + + # Get user's language from hass config + user_language = hass.config.language or "en" + + # Get coordinator to access price data (for currency) + _, coordinator, _ = get_entry_and_data(hass, entry_id) + price_info = coordinator.data.get("priceInfo", {}) + currency = price_info.get("currency", "EUR") + price_unit = format_price_unit_minor(currency) + + # Get a sample entity_id for the series (first sensor from this entry) + entity_registry = async_get_entity_registry(hass) + sample_entity = None + for entity in entity_registry.entities.values(): + if entity.config_entry_id == entry_id and entity.domain == "sensor": + sample_entity = entity.entity_id + break + + if level_type == "rating_level": + series_levels = [ + (PRICE_RATING_LOW, "#2ecc71"), + (PRICE_RATING_NORMAL, "#f1c40f"), + (PRICE_RATING_HIGH, "#e74c3c"), + ] + else: + series_levels = [ + (PRICE_LEVEL_VERY_CHEAP, "#2ecc71"), + (PRICE_LEVEL_CHEAP, "#27ae60"), + (PRICE_LEVEL_NORMAL, "#f1c40f"), + (PRICE_LEVEL_EXPENSIVE, "#e67e22"), + (PRICE_LEVEL_VERY_EXPENSIVE, "#e74c3c"), + ] + series = [] + for level_key, color in series_levels: + # Get translated name for the level using helper function + name = get_level_translation(level_key, level_type, user_language) + # Use server-side insert_nulls='segments' for clean gaps + if level_type == "rating_level": + filter_param = f"rating_level_filter: ['{level_key}']" + else: + filter_param = f"level_filter: ['{level_key}']" + + data_generator = ( + f"const response = await hass.callWS({{ " + f"type: 'call_service', " + f"domain: 'tibber_prices', " + f"service: 'get_chartdata', " + f"return_response: true, " + f"service_data: {{ entry_id: '{entry_id}', day: ['{day}'], {filter_param}, " + f"output_format: 'array_of_arrays', insert_nulls: 'segments', minor_currency: true }} }}); " + f"return response.response.data;" + ) + # Only show extremas for HIGH and LOW levels (not NORMAL) + show_extremas = level_key != "NORMAL" + series.append( + { + "entity": sample_entity or "sensor.tibber_prices", + "name": name, + "type": "area", + "color": color, + "yaxis_id": "price", + "show": {"extremas": show_extremas, "legend_value": False}, + "data_generator": data_generator, + "stroke_width": 1, + } + ) + + # Get translated title based on level_type + title_key = "title_rating_level" if level_type == "rating_level" else "title_level" + title = get_translation(["apexcharts", title_key], user_language) or ( + "Price Phases Daily Progress" if level_type == "rating_level" else "Price Level" + ) + + # Add translated day to title + day_translated = get_translation(["selector", "day", "options", day], user_language) or day.capitalize() + title = f"{title} - {day_translated}" + + # Configure span based on selected day + if day == "yesterday": + span_config = {"start": "day", "offset": "-1d"} + elif day == "tomorrow": + span_config = {"start": "day", "offset": "+1d"} + else: # today + span_config = {"start": "day"} + + return { + "type": "custom:apexcharts-card", + "update_interval": "5m", + "span": span_config, + "header": { + "show": True, + "title": title, + "show_states": False, + }, + "apex_config": { + "chart": { + "animations": {"enabled": False}, + "toolbar": {"show": True, "tools": {"zoom": True, "pan": True}}, + "zoom": {"enabled": True}, + }, + "stroke": {"curve": "stepline", "width": 2}, + "fill": { + "type": "gradient", + "opacity": 0.4, + "gradient": { + "shade": "dark", + "type": "vertical", + "shadeIntensity": 0.5, + "opacityFrom": 0.7, + "opacityTo": 0.2, + }, + }, + "dataLabels": {"enabled": False}, + "tooltip": { + "x": {"format": "HH:mm"}, + "y": {"title": {"formatter": f"function() {{ return '{price_unit}'; }}"}}, + }, + "legend": { + "show": True, + "position": "top", + "horizontalAlign": "left", + "markers": {"radius": 2}, + }, + "grid": { + "show": True, + "borderColor": "#40475D", + "strokeDashArray": 4, + "xaxis": {"lines": {"show": True}}, + "yaxis": {"lines": {"show": True}}, + }, + "markers": {"size": 0}, + }, + "yaxis": [ + { + "id": "price", + "decimals": 2, + "min": 0, + "apex_config": {"title": {"text": price_unit}}, + }, + ], + "now": {"show": True, "color": "#8e24aa", "label": "🕒 LIVE"}, + "all_series_config": { + "stroke_width": 1, + "group_by": {"func": "raw", "duration": "15min"}, + }, + "series": series, + } diff --git a/custom_components/tibber_prices/services.py b/custom_components/tibber_prices/services/chartdata.py similarity index 51% rename from custom_components/tibber_prices/services.py rename to custom_components/tibber_prices/services/chartdata.py index f920cbc..4643550 100644 --- a/custom_components/tibber_prices/services.py +++ b/custom_components/tibber_prices/services/chartdata.py @@ -1,24 +1,33 @@ -"""Services for Tibber Prices integration.""" +""" +Chart data export service handler. + +This module implements the `get_chartdata` service, which exports price data in various +formats for chart visualization (ApexCharts, custom dashboards, external integrations). + +Features: +- Multiple output formats (array_of_objects, array_of_arrays) +- Custom field naming +- Level/rating filtering +- Period filtering (best_price, peak_price) +- Resolution options (15min intervals, hourly aggregation) +- NULL insertion modes for clean gap visualization +- Currency conversion (major/minor units) +- Custom decimal rounding + +Service: tibber_prices.get_chartdata +Response: JSON with chart-ready data + +""" from __future__ import annotations import re from datetime import timedelta -from typing import Any, Final +from typing import TYPE_CHECKING, Any, Final import voluptuous as vol -from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse, callback -from homeassistant.exceptions import ServiceValidationError -from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry -from homeassistant.util import dt as dt_util - -from .api import ( - TibberPricesApiClientAuthenticationError, - TibberPricesApiClientCommunicationError, - TibberPricesApiClientError, -) -from .const import ( +from custom_components.tibber_prices.const import ( CONF_PRICE_RATING_THRESHOLD_HIGH, CONF_PRICE_RATING_THRESHOLD_LOW, DEFAULT_PRICE_RATING_THRESHOLD_HIGH, @@ -32,40 +41,22 @@ from .const import ( PRICE_RATING_HIGH, PRICE_RATING_LOW, PRICE_RATING_NORMAL, - format_price_unit_minor, - get_translation, ) -from .sensor.helpers import aggregate_level_data, aggregate_rating_data +from homeassistant.exceptions import ServiceValidationError +from homeassistant.util import dt as dt_util -APEXCHARTS_YAML_SERVICE_NAME = "get_apexcharts_yaml" -CHARTDATA_SERVICE_NAME = "get_chartdata" -REFRESH_USER_DATA_SERVICE_NAME = "refresh_user_data" +from .formatters import aggregate_hourly_exact, get_period_data, normalize_level_filter, normalize_rating_level_filter +from .helpers import get_entry_and_data + +if TYPE_CHECKING: + from homeassistant.core import ServiceCall + +# Service constants +CHARTDATA_SERVICE_NAME: Final = "get_chartdata" ATTR_DAY: Final = "day" ATTR_ENTRY_ID: Final = "entry_id" -APEXCHARTS_SERVICE_SCHEMA: Final = vol.Schema( - { - vol.Required(ATTR_ENTRY_ID): str, - vol.Optional("day", default="today"): vol.In(["yesterday", "today", "tomorrow"]), - vol.Optional("level_type", default="rating_level"): vol.In(["rating_level", "level"]), - } -) - - -def _normalize_level_filter(value: list[str] | None) -> list[str] | None: - """Convert level filter values to uppercase for case-insensitive comparison.""" - if value is None: - return None - return [v.upper() for v in value] - - -def _normalize_rating_level_filter(value: list[str] | None) -> list[str] | None: - """Convert rating level filter values to uppercase for case-insensitive comparison.""" - if value is None: - return None - return [v.upper() for v in value] - - +# Service schema CHARTDATA_SERVICE_SCHEMA: Final = vol.Schema( { vol.Required(ATTR_ENTRY_ID): str, @@ -80,7 +71,7 @@ CHARTDATA_SERVICE_SCHEMA: Final = vol.Schema( vol.Optional("include_average", default=False): bool, vol.Optional("level_filter"): vol.All( vol.Coerce(list), - _normalize_level_filter, + normalize_level_filter, [ vol.In( [ @@ -95,7 +86,7 @@ CHARTDATA_SERVICE_SCHEMA: Final = vol.Schema( ), vol.Optional("rating_level_filter"): vol.All( vol.Coerce(list), - _normalize_rating_level_filter, + normalize_rating_level_filter, [vol.In([PRICE_RATING_LOW, PRICE_RATING_NORMAL, PRICE_RATING_HIGH])], ), vol.Optional("insert_nulls", default="none"): vol.In(["none", "segments", "all"]), @@ -111,294 +102,27 @@ CHARTDATA_SERVICE_SCHEMA: Final = vol.Schema( } ) -REFRESH_USER_DATA_SERVICE_SCHEMA: Final = vol.Schema( - { - vol.Required(ATTR_ENTRY_ID): str, - } -) -# --- Entry point: Service handler --- - - -def _aggregate_hourly_exact( # noqa: PLR0913, PLR0912, PLR0915 - intervals: list[dict], - start_time_field: str, - price_field: str, - *, - use_minor_currency: bool = False, - round_decimals: int | None = None, - include_level: bool = False, - include_rating_level: bool = False, - level_filter: list[str] | None = None, - rating_level_filter: list[str] | None = None, - include_average: bool = False, - level_field: str = "level", - rating_level_field: str = "rating_level", - average_field: str = "average", - day_average: float | None = None, - threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW, - threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH, - period_timestamps: set[str] | None = None, -) -> list[dict]: +async def handle_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, PLR0915, C901 """ - Aggregate 15-minute intervals to exact hourly averages. + Return price data in chart-friendly format. - Each hour uses exactly 4 intervals (00:00, 00:15, 00:30, 00:45). - Returns data points at the start of each hour. - """ - if not intervals: - return [] + This service exports Tibber price data in customizable formats for chart visualization. + Supports both 15-minute intervals and hourly aggregation, with optional filtering by + price level, rating level, or period (best_price/peak_price). - hourly_data = [] - i = 0 - - while i < len(intervals): - interval = intervals[i] - start_time_str = interval.get("startsAt") - - if not start_time_str: - i += 1 - continue - - # Parse the timestamp - start_time = dt_util.parse_datetime(start_time_str) - if not start_time: - i += 1 - continue - - # Check if this is the start of an hour (:00) - if start_time.minute != 0: - i += 1 - continue - - # Collect 4 intervals for this hour (with optional filtering) - hour_intervals = [] - hour_interval_data = [] # Complete interval data for aggregation functions - for j in range(4): - if i + j < len(intervals): - interval = intervals[i + j] - - # Apply period filter if specified (check startsAt timestamp) - if period_timestamps is not None: - interval_start = interval.get("startsAt") - if interval_start and interval_start not in period_timestamps: - continue - - # Apply level filter if specified - if level_filter is not None and "level" in interval and interval["level"] not in level_filter: - continue - - # Apply rating_level filter if specified - if ( - rating_level_filter is not None - and "rating_level" in interval - and interval["rating_level"] not in rating_level_filter - ): - continue - - price = interval.get("total") - if price is not None: - hour_intervals.append(price) - hour_interval_data.append(interval) - - # Calculate average if we have data - if hour_intervals: - avg_price = sum(hour_intervals) / len(hour_intervals) - - # Convert to minor currency (cents/øre) if requested - avg_price = round(avg_price * 100, 2) if use_minor_currency else round(avg_price, 4) - - # Apply custom rounding if specified - if round_decimals is not None: - avg_price = round(avg_price, round_decimals) - - data_point = {start_time_field: start_time_str, price_field: avg_price} - - # Add aggregated level using same logic as sensors - if include_level and hour_interval_data: - aggregated_level = aggregate_level_data(hour_interval_data) - if aggregated_level: - data_point[level_field] = aggregated_level.upper() # Convert back to uppercase - - # Add aggregated rating_level using same logic as sensors - if include_rating_level and hour_interval_data: - aggregated_rating = aggregate_rating_data(hour_interval_data, threshold_low, threshold_high) - if aggregated_rating: - data_point[rating_level_field] = aggregated_rating.upper() # Convert back to uppercase - - # Add average if requested - if include_average and day_average is not None: - data_point[average_field] = day_average - - hourly_data.append(data_point) - - # Move to next hour (skip 4 intervals) - i += 4 - - return hourly_data - - -def _get_period_data( # noqa: PLR0913, PLR0912, PLR0915 - *, - coordinator: Any, - period_filter: str, - days: list[str], - output_format: str, - minor_currency: bool, - round_decimals: int | None, - level_filter: list[str] | None, - rating_level_filter: list[str] | None, - include_level: bool, - include_rating_level: bool, - start_time_field: str, - end_time_field: str, - price_field: str, - level_field: str, - rating_level_field: str, - data_key: str, - add_trailing_null: bool, -) -> dict[str, Any]: - """ - Get period summary data instead of interval data. - - When period_filter is specified, returns the precomputed period summaries - from the coordinator instead of filtering intervals. - - Note: Period prices (price_avg) are stored in minor currency units (ct/øre). - They are converted to major currency unless minor_currency=True. + See services.yaml for detailed parameter documentation. Args: - coordinator: Data coordinator with period summaries - period_filter: "best_price" or "peak_price" - days: List of days to include - output_format: "array_of_objects" or "array_of_arrays" - minor_currency: If False, convert prices from minor to major units - round_decimals: Optional decimal rounding - level_filter: Optional level filter - rating_level_filter: Optional rating level filter - include_level: Whether to include level field in output - include_rating_level: Whether to include rating_level field in output - start_time_field: Custom name for start time field - end_time_field: Custom name for end time field - price_field: Custom name for price field - level_field: Custom name for level field - rating_level_field: Custom name for rating_level field - data_key: Top-level key name in response - add_trailing_null: Whether to add trailing null point + call: Service call with parameters Returns: - Dictionary with period data in requested format + Dictionary with chart data in requested format + + Raises: + ServiceValidationError: If entry_id is missing or invalid """ - periods_data = coordinator.data.get("periods", {}) - period_data = periods_data.get(period_filter) - - if not period_data: - return {data_key: []} - - period_summaries = period_data.get("periods", []) - if not period_summaries: - return {data_key: []} - - chart_data = [] - - # Filter periods by day if requested - filtered_periods = [] - if days: - # Build set of allowed dates - allowed_dates = set() - for day in days: - # Map day names to actual dates from coordinator - price_info = coordinator.data.get("priceInfo", {}) - day_prices = price_info.get(day, []) - if day_prices: - # Extract date from first interval - first_interval = day_prices[0] - starts_at = first_interval.get("startsAt") - if starts_at: - dt = dt_util.parse_datetime(starts_at) - if dt: - dt = dt_util.as_local(dt) - allowed_dates.add(dt.date()) - - # Filter periods to those within allowed dates - for period in period_summaries: - start = period.get("start") - if start and start.date() in allowed_dates: - filtered_periods.append(period) - else: - filtered_periods = period_summaries - - # Apply level and rating_level filters - for period in filtered_periods: - # Apply level filter (normalize to uppercase for comparison) - if level_filter and "level" in period and period["level"].upper() not in level_filter: - continue - - # Apply rating_level filter (normalize to uppercase for comparison) - if ( - rating_level_filter - and "rating_level" in period - and period["rating_level"].upper() not in rating_level_filter - ): - continue - - # Build data point based on output format - if output_format == "array_of_objects": - # Map period fields to custom field names - # Period has: start, end, level, rating_level, price_avg, price_min, price_max - data_point = {} - - # Start time - data_point[start_time_field] = period["start"] - - # End time - data_point[end_time_field] = period.get("end") - - # Price (use price_avg from period, stored in minor units) - price_avg = period.get("price_avg", 0.0) - # Convert to major currency unless minor_currency=True - if not minor_currency: - price_avg = price_avg / 100 - if round_decimals is not None: - price_avg = round(price_avg, round_decimals) - data_point[price_field] = price_avg - - # Level (only if requested and present) - if include_level and "level" in period: - data_point[level_field] = period["level"].upper() - - # Rating level (only if requested and present) - if include_rating_level and "rating_level" in period: - data_point[rating_level_field] = period["rating_level"].upper() - - chart_data.append(data_point) - - else: # array_of_arrays - # For array_of_arrays, include: [start, price_avg] - price_avg = period.get("price_avg", 0.0) - # Convert to major currency unless minor_currency=True - if not minor_currency: - price_avg = price_avg / 100 - if round_decimals is not None: - price_avg = round(price_avg, round_decimals) - chart_data.append([period["start"], price_avg]) - - # Add trailing null point if requested - if add_trailing_null and chart_data: - if output_format == "array_of_objects": - null_point = {start_time_field: None, end_time_field: None} - for field in [price_field, level_field, rating_level_field]: - null_point[field] = None - chart_data.append(null_point) - else: # array_of_arrays - chart_data.append([None, None]) - - return {data_key: chart_data} - - -async def _get_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, PLR0915, C901 - """Return price data in a simple chart-friendly format similar to Tibber Core integration.""" hass = call.hass entry_id_raw = call.data.get(ATTR_ENTRY_ID) if entry_id_raw is None: @@ -446,7 +170,7 @@ async def _get_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, if average_field in array_fields_template: include_average = True - _, coordinator, _ = _get_entry_and_data(hass, entry_id) + _, coordinator, _ = get_entry_and_data(hass, entry_id) # Get thresholds from config for rating aggregation threshold_low = coordinator.config_entry.options.get( @@ -460,7 +184,7 @@ async def _get_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, # When period_filter is set, return period summaries instead of interval data # Period summaries are already complete objects with aggregated data if period_filter: - return _get_period_data( + return get_period_data( coordinator=coordinator, period_filter=period_filter, days=days, @@ -755,7 +479,7 @@ async def _get_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, elif resolution == "hourly": # Hourly averages (4 intervals per hour: :00, :15, :30, :45) chart_data.extend( - _aggregate_hourly_exact( + aggregate_hourly_exact( day_prices, start_time_field, price_field, @@ -825,272 +549,3 @@ async def _get_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, chart_data.append(null_point) return {data_key: chart_data} - - -def _get_level_translation(level_key: str, level_type: str, language: str) -> str: - """Get translated name for a price level or rating level.""" - level_key_lower = level_key.lower() - # Use correct translation key based on level_type - if level_type == "rating_level": - name = get_translation(["selector", "rating_level_filter", "options", level_key_lower], language) - else: - name = get_translation(["selector", "level_filter", "options", level_key_lower], language) - # Fallback to original key if translation not found - return name or level_key - - -async def _get_apexcharts_yaml(call: ServiceCall) -> dict[str, Any]: - """Return a YAML snippet for an ApexCharts card using the get_apexcharts_data service for each level.""" - hass = call.hass - entry_id_raw = call.data.get(ATTR_ENTRY_ID) - if entry_id_raw is None: - raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") - entry_id: str = str(entry_id_raw) - - day = call.data.get("day", "today") - level_type = call.data.get("level_type", "rating_level") - - # Get user's language from hass config - user_language = hass.config.language or "en" - - # Get coordinator to access price data (for currency) - _, coordinator, _ = _get_entry_and_data(hass, entry_id) - price_info = coordinator.data.get("priceInfo", {}) - currency = price_info.get("currency", "EUR") - price_unit = format_price_unit_minor(currency) - - # Get a sample entity_id for the series (first sensor from this entry) - entity_registry = async_get_entity_registry(hass) - sample_entity = None - for entity in entity_registry.entities.values(): - if entity.config_entry_id == entry_id and entity.domain == "sensor": - sample_entity = entity.entity_id - break - - if level_type == "rating_level": - series_levels = [ - (PRICE_RATING_LOW, "#2ecc71"), - (PRICE_RATING_NORMAL, "#f1c40f"), - (PRICE_RATING_HIGH, "#e74c3c"), - ] - else: - series_levels = [ - (PRICE_LEVEL_VERY_CHEAP, "#2ecc71"), - (PRICE_LEVEL_CHEAP, "#27ae60"), - (PRICE_LEVEL_NORMAL, "#f1c40f"), - (PRICE_LEVEL_EXPENSIVE, "#e67e22"), - (PRICE_LEVEL_VERY_EXPENSIVE, "#e74c3c"), - ] - series = [] - for level_key, color in series_levels: - # Get translated name for the level using helper function - name = _get_level_translation(level_key, level_type, user_language) - # Use server-side insert_nulls='segments' for clean gaps - if level_type == "rating_level": - filter_param = f"rating_level_filter: ['{level_key}']" - else: - filter_param = f"level_filter: ['{level_key}']" - - data_generator = ( - f"const response = await hass.callWS({{ " - f"type: 'call_service', " - f"domain: 'tibber_prices', " - f"service: 'get_chartdata', " - f"return_response: true, " - f"service_data: {{ entry_id: '{entry_id}', day: ['{day}'], {filter_param}, " - f"output_format: 'array_of_arrays', insert_nulls: 'segments', minor_currency: true }} }}); " - f"return response.response.data;" - ) - # Only show extremas for HIGH and LOW levels (not NORMAL) - show_extremas = level_key != "NORMAL" - series.append( - { - "entity": sample_entity or "sensor.tibber_prices", - "name": name, - "type": "area", - "color": color, - "yaxis_id": "price", - "show": {"extremas": show_extremas, "legend_value": False}, - "data_generator": data_generator, - "stroke_width": 1, - } - ) - - # Get translated title based on level_type - title_key = "title_rating_level" if level_type == "rating_level" else "title_level" - title = get_translation(["apexcharts", title_key], user_language) or ( - "Price Phases Daily Progress" if level_type == "rating_level" else "Price Level" - ) - - # Add translated day to title - day_translated = get_translation(["selector", "day", "options", day], user_language) or day.capitalize() - title = f"{title} - {day_translated}" - - # Configure span based on selected day - if day == "yesterday": - span_config = {"start": "day", "offset": "-1d"} - elif day == "tomorrow": - span_config = {"start": "day", "offset": "+1d"} - else: # today - span_config = {"start": "day"} - - return { - "type": "custom:apexcharts-card", - "update_interval": "5m", - "span": span_config, - "header": { - "show": True, - "title": title, - "show_states": False, - }, - "apex_config": { - "chart": { - "animations": {"enabled": False}, - "toolbar": {"show": True, "tools": {"zoom": True, "pan": True}}, - "zoom": {"enabled": True}, - }, - "stroke": {"curve": "stepline", "width": 2}, - "fill": { - "type": "gradient", - "opacity": 0.4, - "gradient": { - "shade": "dark", - "type": "vertical", - "shadeIntensity": 0.5, - "opacityFrom": 0.7, - "opacityTo": 0.2, - }, - }, - "dataLabels": {"enabled": False}, - "tooltip": { - "x": {"format": "HH:mm"}, - "y": {"title": {"formatter": f"function() {{ return '{price_unit}'; }}"}}, - }, - "legend": { - "show": True, - "position": "top", - "horizontalAlign": "left", - "markers": {"radius": 2}, - }, - "grid": { - "show": True, - "borderColor": "#40475D", - "strokeDashArray": 4, - "xaxis": {"lines": {"show": True}}, - "yaxis": {"lines": {"show": True}}, - }, - "markers": {"size": 0}, - }, - "yaxis": [ - { - "id": "price", - "decimals": 2, - "min": 0, - "apex_config": {"title": {"text": price_unit}}, - }, - ], - "now": {"show": True, "color": "#8e24aa", "label": "🕒 LIVE"}, - "all_series_config": { - "stroke_width": 1, - "group_by": {"func": "raw", "duration": "15min"}, - }, - "series": series, - } - - -async def _refresh_user_data(call: ServiceCall) -> dict[str, Any]: - """Refresh user data for a specific config entry and return updated information.""" - entry_id = call.data.get(ATTR_ENTRY_ID) - hass = call.hass - - if not entry_id: - return { - "success": False, - "message": "Entry ID is required", - } - - # Get the entry and coordinator - try: - _, coordinator, _ = _get_entry_and_data(hass, entry_id) - except ServiceValidationError as ex: - return { - "success": False, - "message": f"Invalid entry ID: {ex}", - } - - # Force refresh user data using the public method - try: - updated = await coordinator.refresh_user_data() - except ( - TibberPricesApiClientAuthenticationError, - TibberPricesApiClientCommunicationError, - TibberPricesApiClientError, - ) as ex: - return { - "success": False, - "message": f"API error refreshing user data: {ex!s}", - } - else: - if updated: - user_profile = coordinator.get_user_profile() - homes = coordinator.get_user_homes() - - return { - "success": True, - "message": "User data refreshed successfully", - "user_profile": user_profile, - "homes_count": len(homes), - "homes": homes, - "last_updated": user_profile.get("last_updated"), - } - return { - "success": False, - "message": "User data was already up to date", - } - - -# --- Helpers --- - - -def _get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]: - """Validate entry and extract coordinator and data.""" - if not entry_id: - raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") - entry = next( - (e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id), - None, - ) - if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data: - raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id") - coordinator = entry.runtime_data.coordinator - data = coordinator.data or {} - return entry, coordinator, data - - -# --- Service registration --- - - -@callback -def async_setup_services(hass: HomeAssistant) -> None: - """Set up services for Tibber Prices integration.""" - hass.services.async_register( - DOMAIN, - APEXCHARTS_YAML_SERVICE_NAME, - _get_apexcharts_yaml, - schema=APEXCHARTS_SERVICE_SCHEMA, - supports_response=SupportsResponse.ONLY, - ) - hass.services.async_register( - DOMAIN, - CHARTDATA_SERVICE_NAME, - _get_chartdata, - schema=CHARTDATA_SERVICE_SCHEMA, - supports_response=SupportsResponse.ONLY, - ) - hass.services.async_register( - DOMAIN, - REFRESH_USER_DATA_SERVICE_NAME, - _refresh_user_data, - schema=REFRESH_USER_DATA_SERVICE_SCHEMA, - supports_response=SupportsResponse.ONLY, - ) diff --git a/custom_components/tibber_prices/services/formatters.py b/custom_components/tibber_prices/services/formatters.py new file mode 100644 index 0000000..d136b78 --- /dev/null +++ b/custom_components/tibber_prices/services/formatters.py @@ -0,0 +1,357 @@ +""" +Data formatting utilities for services. + +This module contains data transformation and formatting functions used across +multiple service handlers, including level normalization, hourly aggregation, +and period data extraction. + +Functions: + normalize_level_filter: Convert level filter values to uppercase + normalize_rating_level_filter: Convert rating level filter values to uppercase + aggregate_hourly_exact: Aggregate 15-minute intervals to exact hourly averages + get_period_data: Extract period summary data instead of interval data + get_level_translation: Get translated name for price level or rating level + +Used by: + - services/chartdata.py: Main data export service + - services/apexcharts.py: ApexCharts YAML generation + +""" + +from __future__ import annotations + +from typing import Any + +from custom_components.tibber_prices.const import ( + DEFAULT_PRICE_RATING_THRESHOLD_HIGH, + DEFAULT_PRICE_RATING_THRESHOLD_LOW, + get_translation, +) +from custom_components.tibber_prices.sensor.helpers import aggregate_level_data, aggregate_rating_data +from homeassistant.util import dt as dt_util + + +def normalize_level_filter(value: list[str] | None) -> list[str] | None: + """Convert level filter values to uppercase for case-insensitive comparison.""" + if value is None: + return None + return [v.upper() for v in value] + + +def normalize_rating_level_filter(value: list[str] | None) -> list[str] | None: + """Convert rating level filter values to uppercase for case-insensitive comparison.""" + if value is None: + return None + return [v.upper() for v in value] + + +def aggregate_hourly_exact( # noqa: PLR0913, PLR0912, PLR0915 + intervals: list[dict], + start_time_field: str, + price_field: str, + *, + use_minor_currency: bool = False, + round_decimals: int | None = None, + include_level: bool = False, + include_rating_level: bool = False, + level_filter: list[str] | None = None, + rating_level_filter: list[str] | None = None, + include_average: bool = False, + level_field: str = "level", + rating_level_field: str = "rating_level", + average_field: str = "average", + day_average: float | None = None, + threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW, + threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH, + period_timestamps: set[str] | None = None, +) -> list[dict]: + """ + Aggregate 15-minute intervals to exact hourly averages. + + Each hour uses exactly 4 intervals (00:00, 00:15, 00:30, 00:45). + Returns data points at the start of each hour. + + Args: + intervals: List of 15-minute price intervals + start_time_field: Custom name for start time field + price_field: Custom name for price field + use_minor_currency: Convert to minor currency units (cents/øre) + round_decimals: Optional decimal rounding + include_level: Include aggregated level field + include_rating_level: Include aggregated rating_level field + level_filter: Filter intervals by level values + rating_level_filter: Filter intervals by rating_level values + include_average: Include day average in output + level_field: Custom name for level field + rating_level_field: Custom name for rating_level field + average_field: Custom name for average field + day_average: Day average value to include + threshold_low: Rating level threshold (low/normal boundary) + threshold_high: Rating level threshold (normal/high boundary) + period_timestamps: Set of timestamps to filter by (period filter) + + Returns: + List of hourly data points with aggregated values + + """ + if not intervals: + return [] + + hourly_data = [] + i = 0 + + while i < len(intervals): + interval = intervals[i] + start_time_str = interval.get("startsAt") + + if not start_time_str: + i += 1 + continue + + # Parse the timestamp + start_time = dt_util.parse_datetime(start_time_str) + if not start_time: + i += 1 + continue + + # Check if this is the start of an hour (:00) + if start_time.minute != 0: + i += 1 + continue + + # Collect 4 intervals for this hour (with optional filtering) + hour_intervals = [] + hour_interval_data = [] # Complete interval data for aggregation functions + for j in range(4): + if i + j < len(intervals): + interval = intervals[i + j] + + # Apply period filter if specified (check startsAt timestamp) + if period_timestamps is not None: + interval_start = interval.get("startsAt") + if interval_start and interval_start not in period_timestamps: + continue + + # Apply level filter if specified + if level_filter is not None and "level" in interval and interval["level"] not in level_filter: + continue + + # Apply rating_level filter if specified + if ( + rating_level_filter is not None + and "rating_level" in interval + and interval["rating_level"] not in rating_level_filter + ): + continue + + price = interval.get("total") + if price is not None: + hour_intervals.append(price) + hour_interval_data.append(interval) + + # Calculate average if we have data + if hour_intervals: + avg_price = sum(hour_intervals) / len(hour_intervals) + + # Convert to minor currency (cents/øre) if requested + avg_price = round(avg_price * 100, 2) if use_minor_currency else round(avg_price, 4) + + # Apply custom rounding if specified + if round_decimals is not None: + avg_price = round(avg_price, round_decimals) + + data_point = {start_time_field: start_time_str, price_field: avg_price} + + # Add aggregated level using same logic as sensors + if include_level and hour_interval_data: + aggregated_level = aggregate_level_data(hour_interval_data) + if aggregated_level: + data_point[level_field] = aggregated_level.upper() # Convert back to uppercase + + # Add aggregated rating_level using same logic as sensors + if include_rating_level and hour_interval_data: + aggregated_rating = aggregate_rating_data(hour_interval_data, threshold_low, threshold_high) + if aggregated_rating: + data_point[rating_level_field] = aggregated_rating.upper() # Convert back to uppercase + + # Add average if requested + if include_average and day_average is not None: + data_point[average_field] = day_average + + hourly_data.append(data_point) + + # Move to next hour (skip 4 intervals) + i += 4 + + return hourly_data + + +def get_period_data( # noqa: PLR0913, PLR0912, PLR0915 + *, + coordinator: Any, + period_filter: str, + days: list[str], + output_format: str, + minor_currency: bool, + round_decimals: int | None, + level_filter: list[str] | None, + rating_level_filter: list[str] | None, + include_level: bool, + include_rating_level: bool, + start_time_field: str, + end_time_field: str, + price_field: str, + level_field: str, + rating_level_field: str, + data_key: str, + add_trailing_null: bool, +) -> dict[str, Any]: + """ + Get period summary data instead of interval data. + + When period_filter is specified, returns the precomputed period summaries + from the coordinator instead of filtering intervals. + + Note: Period prices (price_avg) are stored in minor currency units (ct/øre). + They are converted to major currency unless minor_currency=True. + + Args: + coordinator: Data coordinator with period summaries + period_filter: "best_price" or "peak_price" + days: List of days to include + output_format: "array_of_objects" or "array_of_arrays" + minor_currency: If False, convert prices from minor to major units + round_decimals: Optional decimal rounding + level_filter: Optional level filter + rating_level_filter: Optional rating level filter + include_level: Whether to include level field in output + include_rating_level: Whether to include rating_level field in output + start_time_field: Custom name for start time field + end_time_field: Custom name for end time field + price_field: Custom name for price field + level_field: Custom name for level field + rating_level_field: Custom name for rating_level field + data_key: Top-level key name in response + add_trailing_null: Whether to add trailing null point + + Returns: + Dictionary with period data in requested format + + """ + periods_data = coordinator.data.get("periods", {}) + period_data = periods_data.get(period_filter) + + if not period_data: + return {data_key: []} + + period_summaries = period_data.get("periods", []) + if not period_summaries: + return {data_key: []} + + chart_data = [] + + # Filter periods by day if requested + filtered_periods = [] + if days: + # Build set of allowed dates + allowed_dates = set() + for day in days: + # Map day names to actual dates from coordinator + price_info = coordinator.data.get("priceInfo", {}) + day_prices = price_info.get(day, []) + if day_prices: + # Extract date from first interval + first_interval = day_prices[0] + starts_at = first_interval.get("startsAt") + if starts_at: + dt = dt_util.parse_datetime(starts_at) + if dt: + dt = dt_util.as_local(dt) + allowed_dates.add(dt.date()) + + # Filter periods to those within allowed dates + for period in period_summaries: + start = period.get("start") + if start and start.date() in allowed_dates: + filtered_periods.append(period) + else: + filtered_periods = period_summaries + + # Apply level and rating_level filters + for period in filtered_periods: + # Apply level filter (normalize to uppercase for comparison) + if level_filter and "level" in period and period["level"].upper() not in level_filter: + continue + + # Apply rating_level filter (normalize to uppercase for comparison) + if ( + rating_level_filter + and "rating_level" in period + and period["rating_level"].upper() not in rating_level_filter + ): + continue + + # Build data point based on output format + if output_format == "array_of_objects": + # Map period fields to custom field names + # Period has: start, end, level, rating_level, price_avg, price_min, price_max + data_point = {} + + # Start time + data_point[start_time_field] = period["start"] + + # End time + data_point[end_time_field] = period.get("end") + + # Price (use price_avg from period, stored in minor units) + price_avg = period.get("price_avg", 0.0) + # Convert to major currency unless minor_currency=True + if not minor_currency: + price_avg = price_avg / 100 + if round_decimals is not None: + price_avg = round(price_avg, round_decimals) + data_point[price_field] = price_avg + + # Level (only if requested and present) + if include_level and "level" in period: + data_point[level_field] = period["level"].upper() + + # Rating level (only if requested and present) + if include_rating_level and "rating_level" in period: + data_point[rating_level_field] = period["rating_level"].upper() + + chart_data.append(data_point) + + else: # array_of_arrays + # For array_of_arrays, include: [start, price_avg] + price_avg = period.get("price_avg", 0.0) + # Convert to major currency unless minor_currency=True + if not minor_currency: + price_avg = price_avg / 100 + if round_decimals is not None: + price_avg = round(price_avg, round_decimals) + chart_data.append([period["start"], price_avg]) + + # Add trailing null point if requested + if add_trailing_null and chart_data: + if output_format == "array_of_objects": + null_point = {start_time_field: None, end_time_field: None} + for field in [price_field, level_field, rating_level_field]: + null_point[field] = None + chart_data.append(null_point) + else: # array_of_arrays + chart_data.append([None, None]) + + return {data_key: chart_data} + + +def get_level_translation(level_key: str, level_type: str, language: str) -> str: + """Get translated name for a price level or rating level.""" + level_key_lower = level_key.lower() + # Use correct translation key based on level_type + if level_type == "rating_level": + name = get_translation(["selector", "rating_level_filter", "options", level_key_lower], language) + else: + name = get_translation(["selector", "level_filter", "options", level_key_lower], language) + # Fallback to original key if translation not found + return name or level_key diff --git a/custom_components/tibber_prices/services/helpers.py b/custom_components/tibber_prices/services/helpers.py new file mode 100644 index 0000000..ab06651 --- /dev/null +++ b/custom_components/tibber_prices/services/helpers.py @@ -0,0 +1,53 @@ +""" +Shared utilities for service handlers. + +This module provides common helper functions used across multiple service handlers, +such as entry validation and data extraction. + +Functions: + get_entry_and_data: Validate config entry and extract coordinator data + +Used by: + - services/chartdata.py: Chart data export service + - services/apexcharts.py: ApexCharts YAML generation + - services/refresh_user_data.py: User data refresh + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from custom_components.tibber_prices.const import DOMAIN +from homeassistant.exceptions import ServiceValidationError + +if TYPE_CHECKING: + from homeassistant.core import HomeAssistant + + +def get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]: + """ + Validate entry and extract coordinator and data. + + Args: + hass: Home Assistant instance + entry_id: Config entry ID to validate + + Returns: + Tuple of (entry, coordinator, data) + + Raises: + ServiceValidationError: If entry_id is missing or invalid + + """ + if not entry_id: + raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") + entry = next( + (e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id), + None, + ) + if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data: + raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id") + coordinator = entry.runtime_data.coordinator + data = coordinator.data or {} + return entry, coordinator, data diff --git a/custom_components/tibber_prices/services/refresh_user_data.py b/custom_components/tibber_prices/services/refresh_user_data.py new file mode 100644 index 0000000..858b889 --- /dev/null +++ b/custom_components/tibber_prices/services/refresh_user_data.py @@ -0,0 +1,113 @@ +""" +User data refresh service handler. + +This module implements the `refresh_user_data` service, which forces a refresh +of user profile and home information from the Tibber API. + +Features: +- Force refresh of cached user data +- Bypass 24h cache TTL +- Return updated user profile and homes +- Error handling for API failures + +Service: tibber_prices.refresh_user_data +Response: JSON with refresh status and updated data + +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Final + +import voluptuous as vol + +from custom_components.tibber_prices.api import ( + TibberPricesApiClientAuthenticationError, + TibberPricesApiClientCommunicationError, + TibberPricesApiClientError, +) +from homeassistant.exceptions import ServiceValidationError + +from .helpers import get_entry_and_data + +if TYPE_CHECKING: + from homeassistant.core import ServiceCall + +# Service constants +REFRESH_USER_DATA_SERVICE_NAME: Final = "refresh_user_data" +ATTR_ENTRY_ID: Final = "entry_id" + +# Service schema +REFRESH_USER_DATA_SERVICE_SCHEMA: Final = vol.Schema( + { + vol.Required(ATTR_ENTRY_ID): str, + } +) + + +async def handle_refresh_user_data(call: ServiceCall) -> dict[str, Any]: + """ + Refresh user data for a specific config entry. + + Forces a refresh of user profile and home information from Tibber API, + bypassing the 24-hour cache TTL. Returns updated information or error details. + + See services.yaml for detailed parameter documentation. + + Args: + call: Service call with parameters + + Returns: + Dictionary with refresh status and updated data + + Raises: + ServiceValidationError: If entry_id is missing or invalid + + """ + entry_id = call.data.get(ATTR_ENTRY_ID) + hass = call.hass + + if not entry_id: + return { + "success": False, + "message": "Entry ID is required", + } + + # Get the entry and coordinator + try: + _, coordinator, _ = get_entry_and_data(hass, entry_id) + except ServiceValidationError as ex: + return { + "success": False, + "message": f"Invalid entry ID: {ex}", + } + + # Force refresh user data using the public method + try: + updated = await coordinator.refresh_user_data() + except ( + TibberPricesApiClientAuthenticationError, + TibberPricesApiClientCommunicationError, + TibberPricesApiClientError, + ) as ex: + return { + "success": False, + "message": f"API error refreshing user data: {ex!s}", + } + else: + if updated: + user_profile = coordinator.get_user_profile() + homes = coordinator.get_user_homes() + + return { + "success": True, + "message": "User data refreshed successfully", + "user_profile": user_profile, + "homes_count": len(homes), + "homes": homes, + "last_updated": user_profile.get("last_updated"), + } + return { + "success": False, + "message": "User data was already up to date", + }