"""Services for Tibber Prices integration.""" from __future__ import annotations import re from datetime import timedelta from typing import Any, Final import voluptuous as vol from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse, callback from homeassistant.exceptions import ServiceValidationError from homeassistant.helpers.entity_registry import async_get as async_get_entity_registry from homeassistant.util import dt as dt_util from .api import ( TibberPricesApiClientAuthenticationError, TibberPricesApiClientCommunicationError, TibberPricesApiClientError, ) from .const import ( CONF_PRICE_RATING_THRESHOLD_HIGH, CONF_PRICE_RATING_THRESHOLD_LOW, DEFAULT_PRICE_RATING_THRESHOLD_HIGH, DEFAULT_PRICE_RATING_THRESHOLD_LOW, DOMAIN, PRICE_LEVEL_CHEAP, PRICE_LEVEL_EXPENSIVE, PRICE_LEVEL_NORMAL, PRICE_LEVEL_VERY_CHEAP, PRICE_LEVEL_VERY_EXPENSIVE, PRICE_RATING_HIGH, PRICE_RATING_LOW, PRICE_RATING_NORMAL, format_price_unit_minor, get_translation, ) from .sensor.helpers import aggregate_level_data, aggregate_rating_data APEXCHARTS_YAML_SERVICE_NAME = "get_apexcharts_yaml" CHARTDATA_SERVICE_NAME = "get_chartdata" REFRESH_USER_DATA_SERVICE_NAME = "refresh_user_data" ATTR_DAY: Final = "day" ATTR_ENTRY_ID: Final = "entry_id" APEXCHARTS_SERVICE_SCHEMA: Final = vol.Schema( { vol.Required(ATTR_ENTRY_ID): str, vol.Optional("day", default="today"): vol.In(["yesterday", "today", "tomorrow"]), vol.Optional("level_type", default="rating_level"): vol.In(["rating_level", "level"]), } ) def _normalize_level_filter(value: list[str] | None) -> list[str] | None: """Convert level filter values to uppercase for case-insensitive comparison.""" if value is None: return None return [v.upper() for v in value] def _normalize_rating_level_filter(value: list[str] | None) -> list[str] | None: """Convert rating level filter values to uppercase for case-insensitive comparison.""" if value is None: return None return [v.upper() for v in value] CHARTDATA_SERVICE_SCHEMA: Final = vol.Schema( { vol.Required(ATTR_ENTRY_ID): str, vol.Optional(ATTR_DAY): vol.All(vol.Coerce(list), [vol.In(["yesterday", "today", "tomorrow"])]), vol.Optional("resolution", default="interval"): vol.In(["interval", "hourly"]), vol.Optional("output_format", default="array_of_objects"): vol.In(["array_of_objects", "array_of_arrays"]), vol.Optional("array_fields"): str, vol.Optional("minor_currency", default=False): bool, vol.Optional("round_decimals"): vol.All(vol.Coerce(int), vol.Range(min=0, max=10)), vol.Optional("include_level", default=False): bool, vol.Optional("include_rating_level", default=False): bool, vol.Optional("include_average", default=False): bool, vol.Optional("level_filter"): vol.All( vol.Coerce(list), _normalize_level_filter, [ vol.In( [ PRICE_LEVEL_VERY_CHEAP, PRICE_LEVEL_CHEAP, PRICE_LEVEL_NORMAL, PRICE_LEVEL_EXPENSIVE, PRICE_LEVEL_VERY_EXPENSIVE, ] ) ], ), vol.Optional("rating_level_filter"): vol.All( vol.Coerce(list), _normalize_rating_level_filter, [vol.In([PRICE_RATING_LOW, PRICE_RATING_NORMAL, PRICE_RATING_HIGH])], ), vol.Optional("insert_nulls", default="none"): vol.In(["none", "segments", "all"]), vol.Optional("add_trailing_null", default=False): bool, vol.Optional("period_filter"): vol.In(["best_price", "peak_price"]), vol.Optional("start_time_field", default="start_time"): str, vol.Optional("end_time_field", default="end_time"): str, vol.Optional("price_field", default="price_per_kwh"): str, vol.Optional("level_field", default="level"): str, vol.Optional("rating_level_field", default="rating_level"): str, vol.Optional("average_field", default="average"): str, vol.Optional("data_key", default="data"): str, } ) REFRESH_USER_DATA_SERVICE_SCHEMA: Final = vol.Schema( { vol.Required(ATTR_ENTRY_ID): str, } ) # --- Entry point: Service handler --- def _aggregate_hourly_exact( # noqa: PLR0913, PLR0912, PLR0915 intervals: list[dict], start_time_field: str, price_field: str, *, use_minor_currency: bool = False, round_decimals: int | None = None, include_level: bool = False, include_rating_level: bool = False, level_filter: list[str] | None = None, rating_level_filter: list[str] | None = None, include_average: bool = False, level_field: str = "level", rating_level_field: str = "rating_level", average_field: str = "average", day_average: float | None = None, threshold_low: float = DEFAULT_PRICE_RATING_THRESHOLD_LOW, threshold_high: float = DEFAULT_PRICE_RATING_THRESHOLD_HIGH, period_timestamps: set[str] | None = None, ) -> list[dict]: """ Aggregate 15-minute intervals to exact hourly averages. Each hour uses exactly 4 intervals (00:00, 00:15, 00:30, 00:45). Returns data points at the start of each hour. """ if not intervals: return [] hourly_data = [] i = 0 while i < len(intervals): interval = intervals[i] start_time_str = interval.get("startsAt") if not start_time_str: i += 1 continue # Parse the timestamp start_time = dt_util.parse_datetime(start_time_str) if not start_time: i += 1 continue # Check if this is the start of an hour (:00) if start_time.minute != 0: i += 1 continue # Collect 4 intervals for this hour (with optional filtering) hour_intervals = [] hour_interval_data = [] # Complete interval data for aggregation functions for j in range(4): if i + j < len(intervals): interval = intervals[i + j] # Apply period filter if specified (check startsAt timestamp) if period_timestamps is not None: interval_start = interval.get("startsAt") if interval_start and interval_start not in period_timestamps: continue # Apply level filter if specified if level_filter is not None and "level" in interval and interval["level"] not in level_filter: continue # Apply rating_level filter if specified if ( rating_level_filter is not None and "rating_level" in interval and interval["rating_level"] not in rating_level_filter ): continue price = interval.get("total") if price is not None: hour_intervals.append(price) hour_interval_data.append(interval) # Calculate average if we have data if hour_intervals: avg_price = sum(hour_intervals) / len(hour_intervals) # Convert to minor currency (cents/øre) if requested avg_price = round(avg_price * 100, 2) if use_minor_currency else round(avg_price, 4) # Apply custom rounding if specified if round_decimals is not None: avg_price = round(avg_price, round_decimals) data_point = {start_time_field: start_time_str, price_field: avg_price} # Add aggregated level using same logic as sensors if include_level and hour_interval_data: aggregated_level = aggregate_level_data(hour_interval_data) if aggregated_level: data_point[level_field] = aggregated_level.upper() # Convert back to uppercase # Add aggregated rating_level using same logic as sensors if include_rating_level and hour_interval_data: aggregated_rating = aggregate_rating_data(hour_interval_data, threshold_low, threshold_high) if aggregated_rating: data_point[rating_level_field] = aggregated_rating.upper() # Convert back to uppercase # Add average if requested if include_average and day_average is not None: data_point[average_field] = day_average hourly_data.append(data_point) # Move to next hour (skip 4 intervals) i += 4 return hourly_data def _get_period_data( # noqa: PLR0913, PLR0912, PLR0915 *, coordinator: Any, period_filter: str, days: list[str], output_format: str, minor_currency: bool, round_decimals: int | None, level_filter: list[str] | None, rating_level_filter: list[str] | None, include_level: bool, include_rating_level: bool, start_time_field: str, end_time_field: str, price_field: str, level_field: str, rating_level_field: str, data_key: str, add_trailing_null: bool, ) -> dict[str, Any]: """ Get period summary data instead of interval data. When period_filter is specified, returns the precomputed period summaries from the coordinator instead of filtering intervals. Note: Period prices (price_avg) are stored in minor currency units (ct/øre). They are converted to major currency unless minor_currency=True. Args: coordinator: Data coordinator with period summaries period_filter: "best_price" or "peak_price" days: List of days to include output_format: "array_of_objects" or "array_of_arrays" minor_currency: If False, convert prices from minor to major units round_decimals: Optional decimal rounding level_filter: Optional level filter rating_level_filter: Optional rating level filter include_level: Whether to include level field in output include_rating_level: Whether to include rating_level field in output start_time_field: Custom name for start time field end_time_field: Custom name for end time field price_field: Custom name for price field level_field: Custom name for level field rating_level_field: Custom name for rating_level field data_key: Top-level key name in response add_trailing_null: Whether to add trailing null point Returns: Dictionary with period data in requested format """ periods_data = coordinator.data.get("periods", {}) period_data = periods_data.get(period_filter) if not period_data: return {data_key: []} period_summaries = period_data.get("periods", []) if not period_summaries: return {data_key: []} chart_data = [] # Filter periods by day if requested filtered_periods = [] if days: # Build set of allowed dates allowed_dates = set() for day in days: # Map day names to actual dates from coordinator price_info = coordinator.data.get("priceInfo", {}) day_prices = price_info.get(day, []) if day_prices: # Extract date from first interval first_interval = day_prices[0] starts_at = first_interval.get("startsAt") if starts_at: dt = dt_util.parse_datetime(starts_at) if dt: dt = dt_util.as_local(dt) allowed_dates.add(dt.date()) # Filter periods to those within allowed dates for period in period_summaries: start = period.get("start") if start and start.date() in allowed_dates: filtered_periods.append(period) else: filtered_periods = period_summaries # Apply level and rating_level filters for period in filtered_periods: # Apply level filter (normalize to uppercase for comparison) if level_filter and "level" in period and period["level"].upper() not in level_filter: continue # Apply rating_level filter (normalize to uppercase for comparison) if ( rating_level_filter and "rating_level" in period and period["rating_level"].upper() not in rating_level_filter ): continue # Build data point based on output format if output_format == "array_of_objects": # Map period fields to custom field names # Period has: start, end, level, rating_level, price_avg, price_min, price_max data_point = {} # Start time data_point[start_time_field] = period["start"] # End time data_point[end_time_field] = period.get("end") # Price (use price_avg from period, stored in minor units) price_avg = period.get("price_avg", 0.0) # Convert to major currency unless minor_currency=True if not minor_currency: price_avg = price_avg / 100 if round_decimals is not None: price_avg = round(price_avg, round_decimals) data_point[price_field] = price_avg # Level (only if requested and present) if include_level and "level" in period: data_point[level_field] = period["level"].upper() # Rating level (only if requested and present) if include_rating_level and "rating_level" in period: data_point[rating_level_field] = period["rating_level"].upper() chart_data.append(data_point) else: # array_of_arrays # For array_of_arrays, include: [start, price_avg] price_avg = period.get("price_avg", 0.0) # Convert to major currency unless minor_currency=True if not minor_currency: price_avg = price_avg / 100 if round_decimals is not None: price_avg = round(price_avg, round_decimals) chart_data.append([period["start"], price_avg]) # Add trailing null point if requested if add_trailing_null and chart_data: if output_format == "array_of_objects": null_point = {start_time_field: None, end_time_field: None} for field in [price_field, level_field, rating_level_field]: null_point[field] = None chart_data.append(null_point) else: # array_of_arrays chart_data.append([None, None]) return {data_key: chart_data} async def _get_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR0912, PLR0915, C901 """Return price data in a simple chart-friendly format similar to Tibber Core integration.""" hass = call.hass entry_id_raw = call.data.get(ATTR_ENTRY_ID) if entry_id_raw is None: raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") entry_id: str = str(entry_id_raw) days_raw = call.data.get(ATTR_DAY) # If no day specified, return all available data (today + tomorrow) if days_raw is None: days = ["today", "tomorrow"] # Convert single string to list for uniform processing elif isinstance(days_raw, str): days = [days_raw] else: days = days_raw start_time_field = call.data.get("start_time_field", "start_time") end_time_field = call.data.get("end_time_field", "end_time") price_field = call.data.get("price_field", "price_per_kwh") level_field = call.data.get("level_field", "level") rating_level_field = call.data.get("rating_level_field", "rating_level") average_field = call.data.get("average_field", "average") data_key = call.data.get("data_key", "data") resolution = call.data.get("resolution", "interval") output_format = call.data.get("output_format", "array_of_objects") minor_currency = call.data.get("minor_currency", False) round_decimals = call.data.get("round_decimals") include_level = call.data.get("include_level", False) include_rating_level = call.data.get("include_rating_level", False) include_average = call.data.get("include_average", False) insert_nulls = call.data.get("insert_nulls", "none") add_trailing_null = call.data.get("add_trailing_null", False) period_filter = call.data.get("period_filter") # Filter values are already normalized to uppercase by schema validators level_filter = call.data.get("level_filter") rating_level_filter = call.data.get("rating_level_filter") # If array_fields is specified, implicitly enable fields that are used array_fields_template = call.data.get("array_fields") if array_fields_template and output_format == "array_of_arrays": if level_field in array_fields_template: include_level = True if rating_level_field in array_fields_template: include_rating_level = True if average_field in array_fields_template: include_average = True _, coordinator, _ = _get_entry_and_data(hass, entry_id) # Get thresholds from config for rating aggregation threshold_low = coordinator.config_entry.options.get( CONF_PRICE_RATING_THRESHOLD_LOW, DEFAULT_PRICE_RATING_THRESHOLD_LOW ) threshold_high = coordinator.config_entry.options.get( CONF_PRICE_RATING_THRESHOLD_HIGH, DEFAULT_PRICE_RATING_THRESHOLD_HIGH ) # === SPECIAL HANDLING: Period Filter === # When period_filter is set, return period summaries instead of interval data # Period summaries are already complete objects with aggregated data if period_filter: return _get_period_data( coordinator=coordinator, period_filter=period_filter, days=days, output_format=output_format, minor_currency=minor_currency, round_decimals=round_decimals, level_filter=level_filter, rating_level_filter=rating_level_filter, include_level=include_level, include_rating_level=include_rating_level, start_time_field=start_time_field, end_time_field=end_time_field, price_field=price_field, level_field=level_field, rating_level_field=rating_level_field, data_key=data_key, add_trailing_null=add_trailing_null, ) # === NORMAL HANDLING: Interval Data === # Get price data for all requested days price_info = coordinator.data.get("priceInfo", {}) chart_data = [] # Build set of timestamps that match period_filter if specified period_timestamps = None if period_filter: period_timestamps = set() periods_data = coordinator.data.get("periods", {}) period_data = periods_data.get(period_filter) if period_data: period_summaries = period_data.get("periods", []) # Period summaries don't contain intervals, only start/end timestamps # Build set of all 15-minute intervals within period ranges for period_summary in period_summaries: start = period_summary.get("start") end = period_summary.get("end") if start and end: # Generate all 15-minute timestamps within this period current = start while current < end: period_timestamps.add(current.isoformat()) current = current + timedelta(minutes=15) # Collect all timestamps if insert_nulls='all' (needed to insert NULLs for missing filter matches) all_timestamps = set() if insert_nulls == "all" and (level_filter or rating_level_filter): for day in days: day_prices = price_info.get(day, []) for interval in day_prices: start_time = interval.get("startsAt") if start_time: all_timestamps.add(start_time) all_timestamps = sorted(all_timestamps) # Calculate average if requested day_averages = {} if include_average: for day in days: day_prices = price_info.get(day, []) if day_prices: prices = [p["total"] for p in day_prices if p.get("total") is not None] if prices: avg = sum(prices) / len(prices) # Apply same transformations as to regular prices avg = round(avg * 100, 2) if minor_currency else round(avg, 4) if round_decimals is not None: avg = round(avg, round_decimals) day_averages[day] = avg for day in days: day_prices = price_info.get(day, []) if resolution == "interval": # Original 15-minute intervals if insert_nulls == "all" and (level_filter or rating_level_filter): # Mode 'all': Insert NULL for all timestamps where filter doesn't match # Build a map of timestamp -> interval for quick lookup interval_map = { interval.get("startsAt"): interval for interval in day_prices if interval.get("startsAt") } # Process all timestamps, filling gaps with NULL for start_time in all_timestamps: interval = interval_map.get(start_time) if interval is None: # No data for this timestamp - skip entirely continue price = interval.get("total") if price is None: continue # Check if this interval matches the filter matches_filter = False if level_filter and "level" in interval: matches_filter = interval["level"] in level_filter elif rating_level_filter and "rating_level" in interval: matches_filter = interval["rating_level"] in rating_level_filter # If filter is set but doesn't match, insert NULL price if not matches_filter: price = None elif price is not None: # Convert to minor currency (cents/øre) if requested price = round(price * 100, 2) if minor_currency else round(price, 4) # Apply custom rounding if specified if round_decimals is not None: price = round(price, round_decimals) data_point = {start_time_field: start_time, price_field: price} # Add level if requested (only when price is not NULL) if include_level and "level" in interval and price is not None: data_point[level_field] = interval["level"] # Add rating_level if requested (only when price is not NULL) if include_rating_level and "rating_level" in interval and price is not None: data_point[rating_level_field] = interval["rating_level"] # Add average if requested if include_average and day in day_averages: data_point[average_field] = day_averages[day] chart_data.append(data_point) elif insert_nulls == "segments" and (level_filter or rating_level_filter): # Mode 'segments': Add NULL points at segment boundaries for clean gaps # Determine which field to check based on filter type filter_field = "rating_level" if rating_level_filter else "level" filter_values = rating_level_filter if rating_level_filter else level_filter for i in range(len(day_prices) - 1): interval = day_prices[i] next_interval = day_prices[i + 1] start_time = interval.get("startsAt") price = interval.get("total") next_start_time = next_interval.get("startsAt") if start_time is None or price is None: continue interval_value = interval.get(filter_field) next_value = next_interval.get(filter_field) # Check if current interval matches filter if interval_value in filter_values: # Convert price converted_price = round(price * 100, 2) if minor_currency else round(price, 4) if round_decimals is not None: converted_price = round(converted_price, round_decimals) # Add current point data_point = {start_time_field: start_time, price_field: converted_price} if include_level and "level" in interval: data_point[level_field] = interval["level"] if include_rating_level and "rating_level" in interval: data_point[rating_level_field] = interval["rating_level"] if include_average and day in day_averages: data_point[average_field] = day_averages[day] chart_data.append(data_point) # Check if next interval is different level (segment boundary) if next_value != interval_value: # Hold current price until next timestamp (stepline effect) hold_point = {start_time_field: next_start_time, price_field: converted_price} if include_level and "level" in interval: hold_point[level_field] = interval["level"] if include_rating_level and "rating_level" in interval: hold_point[rating_level_field] = interval["rating_level"] if include_average and day in day_averages: hold_point[average_field] = day_averages[day] chart_data.append(hold_point) # Add NULL point to create gap null_point = {start_time_field: next_start_time, price_field: None} chart_data.append(null_point) # Handle last interval of the day - extend to midnight if day_prices: last_interval = day_prices[-1] last_start_time = last_interval.get("startsAt") last_price = last_interval.get("total") last_value = last_interval.get(filter_field) if last_start_time and last_price is not None and last_value in filter_values: # Parse timestamp and calculate midnight of next day last_dt = dt_util.parse_datetime(last_start_time) if last_dt: last_dt = dt_util.as_local(last_dt) # Calculate next day at 00:00 next_day = last_dt.replace(hour=0, minute=0, second=0, microsecond=0) next_day = next_day + timedelta(days=1) midnight_timestamp = next_day.isoformat() # Try to get real price from tomorrow's first interval next_day_name = None if day == "yesterday": next_day_name = "today" elif day == "today": next_day_name = "tomorrow" # For "tomorrow", we don't have a "day after tomorrow" midnight_price = None midnight_interval = None if next_day_name: next_day_prices = price_info.get(next_day_name, []) if next_day_prices: first_next = next_day_prices[0] first_next_value = first_next.get(filter_field) # Only use tomorrow's price if it matches the same filter if first_next_value == last_value: midnight_price = first_next.get("total") midnight_interval = first_next # Fallback: use last interval's price if no tomorrow data or different level if midnight_price is None: midnight_price = last_price midnight_interval = last_interval # Convert price converted_price = ( round(midnight_price * 100, 2) if minor_currency else round(midnight_price, 4) ) if round_decimals is not None: converted_price = round(converted_price, round_decimals) # Add point at midnight with appropriate price (extends graph to end of day) end_point = {start_time_field: midnight_timestamp, price_field: converted_price} if midnight_interval is not None: if include_level and "level" in midnight_interval: end_point[level_field] = midnight_interval["level"] if include_rating_level and "rating_level" in midnight_interval: end_point[rating_level_field] = midnight_interval["rating_level"] if include_average and day in day_averages: end_point[average_field] = day_averages[day] chart_data.append(end_point) else: # Mode 'none' (default): Only return matching intervals, no NULL insertion for interval in day_prices: start_time = interval.get("startsAt") price = interval.get("total") if start_time is not None and price is not None: # Apply period filter if specified if ( period_filter is not None and period_timestamps is not None and start_time not in period_timestamps ): continue # Apply level filter if specified if level_filter is not None and "level" in interval and interval["level"] not in level_filter: continue # Apply rating_level filter if specified if ( rating_level_filter is not None and "rating_level" in interval and interval["rating_level"] not in rating_level_filter ): continue # Convert to minor currency (cents/øre) if requested price = round(price * 100, 2) if minor_currency else round(price, 4) # Apply custom rounding if specified if round_decimals is not None: price = round(price, round_decimals) data_point = {start_time_field: start_time, price_field: price} # Add level if requested if include_level and "level" in interval: data_point[level_field] = interval["level"] # Add rating_level if requested if include_rating_level and "rating_level" in interval: data_point[rating_level_field] = interval["rating_level"] # Add average if requested if include_average and day in day_averages: data_point[average_field] = day_averages[day] chart_data.append(data_point) elif resolution == "hourly": # Hourly averages (4 intervals per hour: :00, :15, :30, :45) chart_data.extend( _aggregate_hourly_exact( day_prices, start_time_field, price_field, use_minor_currency=minor_currency, round_decimals=round_decimals, include_level=include_level, include_rating_level=include_rating_level, level_filter=level_filter, rating_level_filter=rating_level_filter, include_average=include_average, level_field=level_field, rating_level_field=rating_level_field, average_field=average_field, day_average=day_averages.get(day), threshold_low=threshold_low, period_timestamps=period_timestamps, threshold_high=threshold_high, ) ) # Convert to array of arrays format if requested if output_format == "array_of_arrays": array_fields_template = call.data.get("array_fields") # Default: nur timestamp und price if not array_fields_template: array_fields_template = f"{{{start_time_field}}}, {{{price_field}}}" # Parse template to extract field names field_pattern = re.compile(r"\{([^}]+)\}") field_names = field_pattern.findall(array_fields_template) if not field_names: raise ServiceValidationError( translation_domain=DOMAIN, translation_key="invalid_array_fields", translation_placeholders={"template": array_fields_template}, ) # Convert to [[field1, field2, ...], ...] format points = [] for item in chart_data: row = [] for field_name in field_names: # Get value from item, or None if field doesn't exist value = item.get(field_name) row.append(value) points.append(row) # Add final null point for stepline rendering if requested # (some chart libraries need this to prevent extrapolation to viewport edge) if add_trailing_null and points: null_row = [points[-1][0]] + [None] * (len(field_names) - 1) points.append(null_row) return {data_key: points} # Add trailing null point for array_of_objects format if requested if add_trailing_null and chart_data: # Create a null point with only timestamp from last item, all other fields as None last_item = chart_data[-1] null_point = {start_time_field: last_item.get(start_time_field)} # Set all other potential fields to None for field in [price_field, level_field, rating_level_field, average_field]: if field in last_item: null_point[field] = None chart_data.append(null_point) return {data_key: chart_data} def _get_level_translation(level_key: str, level_type: str, language: str) -> str: """Get translated name for a price level or rating level.""" level_key_lower = level_key.lower() # Use correct translation key based on level_type if level_type == "rating_level": name = get_translation(["selector", "rating_level_filter", "options", level_key_lower], language) else: name = get_translation(["selector", "level_filter", "options", level_key_lower], language) # Fallback to original key if translation not found return name or level_key async def _get_apexcharts_yaml(call: ServiceCall) -> dict[str, Any]: """Return a YAML snippet for an ApexCharts card using the get_apexcharts_data service for each level.""" hass = call.hass entry_id_raw = call.data.get(ATTR_ENTRY_ID) if entry_id_raw is None: raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") entry_id: str = str(entry_id_raw) day = call.data.get("day", "today") level_type = call.data.get("level_type", "rating_level") # Get user's language from hass config user_language = hass.config.language or "en" # Get coordinator to access price data (for currency) _, coordinator, _ = _get_entry_and_data(hass, entry_id) price_info = coordinator.data.get("priceInfo", {}) currency = price_info.get("currency", "EUR") price_unit = format_price_unit_minor(currency) # Get a sample entity_id for the series (first sensor from this entry) entity_registry = async_get_entity_registry(hass) sample_entity = None for entity in entity_registry.entities.values(): if entity.config_entry_id == entry_id and entity.domain == "sensor": sample_entity = entity.entity_id break if level_type == "rating_level": series_levels = [ (PRICE_RATING_LOW, "#2ecc71"), (PRICE_RATING_NORMAL, "#f1c40f"), (PRICE_RATING_HIGH, "#e74c3c"), ] else: series_levels = [ (PRICE_LEVEL_VERY_CHEAP, "#2ecc71"), (PRICE_LEVEL_CHEAP, "#27ae60"), (PRICE_LEVEL_NORMAL, "#f1c40f"), (PRICE_LEVEL_EXPENSIVE, "#e67e22"), (PRICE_LEVEL_VERY_EXPENSIVE, "#e74c3c"), ] series = [] for level_key, color in series_levels: # Get translated name for the level using helper function name = _get_level_translation(level_key, level_type, user_language) # Use server-side insert_nulls='segments' for clean gaps if level_type == "rating_level": filter_param = f"rating_level_filter: ['{level_key}']" else: filter_param = f"level_filter: ['{level_key}']" data_generator = ( f"const response = await hass.callWS({{ " f"type: 'call_service', " f"domain: 'tibber_prices', " f"service: 'get_chartdata', " f"return_response: true, " f"service_data: {{ entry_id: '{entry_id}', day: ['{day}'], {filter_param}, " f"output_format: 'array_of_arrays', insert_nulls: 'segments', minor_currency: true }} }}); " f"return response.response.data;" ) # Only show extremas for HIGH and LOW levels (not NORMAL) show_extremas = level_key != "NORMAL" series.append( { "entity": sample_entity or "sensor.tibber_prices", "name": name, "type": "area", "color": color, "yaxis_id": "price", "show": {"extremas": show_extremas, "legend_value": False}, "data_generator": data_generator, "stroke_width": 1, } ) # Get translated title based on level_type title_key = "title_rating_level" if level_type == "rating_level" else "title_level" title = get_translation(["apexcharts", title_key], user_language) or ( "Price Phases Daily Progress" if level_type == "rating_level" else "Price Level" ) # Add translated day to title day_translated = get_translation(["selector", "day", "options", day], user_language) or day.capitalize() title = f"{title} - {day_translated}" # Configure span based on selected day if day == "yesterday": span_config = {"start": "day", "offset": "-1d"} elif day == "tomorrow": span_config = {"start": "day", "offset": "+1d"} else: # today span_config = {"start": "day"} return { "type": "custom:apexcharts-card", "update_interval": "5m", "span": span_config, "header": { "show": True, "title": title, "show_states": False, }, "apex_config": { "chart": { "animations": {"enabled": False}, "toolbar": {"show": True, "tools": {"zoom": True, "pan": True}}, "zoom": {"enabled": True}, }, "stroke": {"curve": "stepline", "width": 2}, "fill": { "type": "gradient", "opacity": 0.4, "gradient": { "shade": "dark", "type": "vertical", "shadeIntensity": 0.5, "opacityFrom": 0.7, "opacityTo": 0.2, }, }, "dataLabels": {"enabled": False}, "tooltip": { "x": {"format": "HH:mm"}, "y": {"title": {"formatter": f"function() {{ return '{price_unit}'; }}"}}, }, "legend": { "show": True, "position": "top", "horizontalAlign": "left", "markers": {"radius": 2}, }, "grid": { "show": True, "borderColor": "#40475D", "strokeDashArray": 4, "xaxis": {"lines": {"show": True}}, "yaxis": {"lines": {"show": True}}, }, "markers": {"size": 0}, }, "yaxis": [ { "id": "price", "decimals": 2, "min": 0, "apex_config": {"title": {"text": price_unit}}, }, ], "now": {"show": True, "color": "#8e24aa", "label": "🕒 LIVE"}, "all_series_config": { "stroke_width": 1, "group_by": {"func": "raw", "duration": "15min"}, }, "series": series, } async def _refresh_user_data(call: ServiceCall) -> dict[str, Any]: """Refresh user data for a specific config entry and return updated information.""" entry_id = call.data.get(ATTR_ENTRY_ID) hass = call.hass if not entry_id: return { "success": False, "message": "Entry ID is required", } # Get the entry and coordinator try: _, coordinator, _ = _get_entry_and_data(hass, entry_id) except ServiceValidationError as ex: return { "success": False, "message": f"Invalid entry ID: {ex}", } # Force refresh user data using the public method try: updated = await coordinator.refresh_user_data() except ( TibberPricesApiClientAuthenticationError, TibberPricesApiClientCommunicationError, TibberPricesApiClientError, ) as ex: return { "success": False, "message": f"API error refreshing user data: {ex!s}", } else: if updated: user_profile = coordinator.get_user_profile() homes = coordinator.get_user_homes() return { "success": True, "message": "User data refreshed successfully", "user_profile": user_profile, "homes_count": len(homes), "homes": homes, "last_updated": user_profile.get("last_updated"), } return { "success": False, "message": "User data was already up to date", } # --- Helpers --- def _get_entry_and_data(hass: HomeAssistant, entry_id: str) -> tuple[Any, Any, dict]: """Validate entry and extract coordinator and data.""" if not entry_id: raise ServiceValidationError(translation_domain=DOMAIN, translation_key="missing_entry_id") entry = next( (e for e in hass.config_entries.async_entries(DOMAIN) if e.entry_id == entry_id), None, ) if not entry or not hasattr(entry, "runtime_data") or not entry.runtime_data: raise ServiceValidationError(translation_domain=DOMAIN, translation_key="invalid_entry_id") coordinator = entry.runtime_data.coordinator data = coordinator.data or {} return entry, coordinator, data # --- Service registration --- @callback def async_setup_services(hass: HomeAssistant) -> None: """Set up services for Tibber Prices integration.""" hass.services.async_register( DOMAIN, APEXCHARTS_YAML_SERVICE_NAME, _get_apexcharts_yaml, schema=APEXCHARTS_SERVICE_SCHEMA, supports_response=SupportsResponse.ONLY, ) hass.services.async_register( DOMAIN, CHARTDATA_SERVICE_NAME, _get_chartdata, schema=CHARTDATA_SERVICE_SCHEMA, supports_response=SupportsResponse.ONLY, ) hass.services.async_register( DOMAIN, REFRESH_USER_DATA_SERVICE_NAME, _refresh_user_data, schema=REFRESH_USER_DATA_SERVICE_SCHEMA, supports_response=SupportsResponse.ONLY, )