hass.tibber_prices/custom_components/tibber_prices/sensor/chart_data.py
Julian Pawlowski 625bc222ca refactor(coordinator): centralize time operations through TimeService
Introduce TimeService as single source of truth for all datetime operations,
replacing direct dt_util calls throughout the codebase. This establishes
consistent time context across update cycles and enables future time-travel
testing capability.

Core changes:
- NEW: coordinator/time_service.py with timezone-aware datetime API
- Coordinator now creates TimeService per update cycle, passes to calculators
- Timer callbacks (#2, #3) inject TimeService into entity update flow
- All sensor calculators receive TimeService via coordinator reference
- Attribute builders accept time parameter for timestamp calculations

Key patterns replaced:
- dt_util.now() → time.now() (single reference time per cycle)
- dt_util.parse_datetime() + as_local() → time.get_interval_time()
- Manual interval arithmetic → time.get_interval_offset_time()
- Manual day boundaries → time.get_day_boundaries()
- round_to_nearest_quarter_hour() → time.round_to_nearest_quarter()

Import cleanup:
- Removed dt_util imports from ~30 files (calculators, attributes, utils)
- Restricted dt_util to 3 modules: time_service.py (operations), api/client.py
  (rate limiting), entity_utils/icons.py (cosmetic updates)
- datetime/timedelta only for TYPE_CHECKING (type hints) or duration arithmetic

Interval resolution abstraction:
- Removed hardcoded MINUTES_PER_INTERVAL constant from 10+ files
- New methods: time.minutes_to_intervals(), time.get_interval_duration()
- Supports future 60-minute resolution (legacy data) via TimeService config

Timezone correctness:
- API timestamps (startsAt) already localized by data transformation
- TimeService operations preserve HA user timezone throughout
- DST transitions handled via get_expected_intervals_for_day() (future use)

Timestamp ordering preserved:
- Attribute builders generate default timestamp (rounded quarter)
- Sensors override when needed (next interval, daily midnight, etc.)
- Platform ensures timestamp stays FIRST in attribute dict

Timer integration:
- Timer #2 (quarter-hour): Creates TimeService, calls _handle_time_sensitive_update(time)
- Timer #3 (30-second): Creates TimeService, calls _handle_minute_update(time)
- Consistent time reference for all entities in same update batch

Time-travel readiness:
- TimeService.with_reference_time() enables time injection (not yet used)
- All calculations use time.now() → easy to simulate past/future states
- Foundation for debugging period calculations with historical data

Impact: Eliminates timestamp drift within update cycles (previously 60+ independent
dt_util.now() calls could differ by milliseconds). Establishes architecture for
time-based testing and debugging features.
2025-11-19 18:36:12 +00:00

144 lines
4.4 KiB
Python

"""Chart data export functionality for Tibber Prices sensors."""
from __future__ import annotations
from typing import TYPE_CHECKING
import yaml
from custom_components.tibber_prices.const import CONF_CHART_DATA_CONFIG, DOMAIN
if TYPE_CHECKING:
from datetime import datetime
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
from custom_components.tibber_prices.data import TibberPricesConfigEntry
from homeassistant.core import HomeAssistant
async def call_chartdata_service_async(
hass: HomeAssistant,
coordinator: TibberPricesDataUpdateCoordinator,
config_entry: TibberPricesConfigEntry,
) -> tuple[dict | None, str | None]:
"""
Call get_chartdata service with user-configured YAML (async).
Returns:
Tuple of (response, error_message).
If successful: (response_dict, None)
If failed: (None, error_string)
"""
# Get user-configured YAML
yaml_config = config_entry.options.get(CONF_CHART_DATA_CONFIG, "")
# Parse YAML if provided, otherwise use empty dict (service defaults)
service_params = {}
if yaml_config and yaml_config.strip():
try:
parsed = yaml.safe_load(yaml_config)
# Ensure we have a dict (yaml.safe_load can return str, int, etc.)
if isinstance(parsed, dict):
service_params = parsed
else:
coordinator.logger.warning(
"YAML configuration must be a dictionary, got %s. Using service defaults.",
type(parsed).__name__,
)
service_params = {}
except yaml.YAMLError as err:
coordinator.logger.warning(
"Invalid chart data YAML configuration: %s. Using service defaults.",
err,
)
service_params = {} # Fall back to service defaults
# Add required entry_id parameter
service_params["entry_id"] = config_entry.entry_id
# Call get_chartdata service using official HA service system
try:
response = await hass.services.async_call(
DOMAIN,
"get_chartdata",
service_params,
blocking=True,
return_response=True,
)
except Exception as ex:
coordinator.logger.exception("Chart data service call failed")
return None, str(ex)
else:
return response, None
def get_chart_data_state(
chart_data_response: dict | None,
chart_data_error: str | None,
) -> str | None:
"""
Return state for chart_data_export sensor.
Args:
chart_data_response: Last service response (or None)
chart_data_error: Last error message (or None)
Returns:
"error" if error occurred
"ready" if data available
"pending" if no data yet
"""
if chart_data_error:
return "error"
if chart_data_response:
return "ready"
return "pending"
def build_chart_data_attributes(
chart_data_response: dict | None,
chart_data_last_update: datetime | None,
chart_data_error: str | None,
) -> dict[str, object] | None:
"""
Return chart data from last service call as attributes with metadata.
Attribute order: timestamp, error (if any), service data (at the end).
Args:
chart_data_response: Last service response
chart_data_last_update: Timestamp of last update
chart_data_error: Error message if service call failed
Returns:
Dict with timestamp, optional error, and service response data.
"""
# Build base attributes with metadata FIRST
attributes: dict[str, object] = {
"timestamp": chart_data_last_update,
}
# Add error message if service call failed
if chart_data_error:
attributes["error"] = chart_data_error
if not chart_data_response:
# No data - only metadata (timestamp, error)
return attributes
# Service data goes LAST - after metadata
if isinstance(chart_data_response, dict):
if len(chart_data_response) > 1:
# Multiple keys → wrap to prevent collision with metadata
attributes["data"] = chart_data_response
else:
# Single key → safe to merge directly
attributes.update(chart_data_response)
else:
# If response is array/list/primitive, wrap it in "data" key
attributes["data"] = chart_data_response
return attributes