fix config flow

This commit is contained in:
Julian Pawlowski 2025-11-02 15:46:13 +00:00
parent 772f9dd310
commit 0ffa17679b
7 changed files with 300 additions and 48 deletions

View file

@ -16,7 +16,7 @@ from homeassistant.helpers.storage import Store
from homeassistant.loader import async_get_loaded_integration
from .api import TibberPricesApiClient
from .const import DOMAIN, LOGGER, async_load_translations
from .const import DOMAIN, LOGGER, async_load_standard_translations, async_load_translations
from .coordinator import STORAGE_VERSION, TibberPricesDataUpdateCoordinator
from .data import TibberPricesData
from .services import async_setup_services
@ -41,10 +41,12 @@ async def async_setup_entry(
LOGGER.debug(f"[tibber_prices] async_setup_entry called for entry_id={entry.entry_id}")
# Preload translations to populate the cache
await async_load_translations(hass, "en")
await async_load_standard_translations(hass, "en")
# Try to load translations for the user's configured language if not English
if hass.config.language and hass.config.language != "en":
await async_load_translations(hass, hass.config.language)
await async_load_standard_translations(hass, hass.config.language)
# Register services when a config entry is loaded
async_setup_services(hass)

View file

@ -23,6 +23,7 @@ from homeassistant.helpers.selector import (
NumberSelector,
NumberSelectorConfig,
NumberSelectorMode,
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
@ -59,6 +60,11 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
"""Initialize the config flow."""
super().__init__()
self._reauth_entry: ConfigEntry | None = None
self._viewer: dict | None = None
self._access_token: str | None = None
self._user_name: str | None = None
self._user_login: str | None = None
self._user_id: str | None = None
@classmethod
@callback
@ -109,21 +115,18 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
LOGGER.debug("Viewer data received: %s", viewer)
data = {CONF_ACCESS_TOKEN: user_input[CONF_ACCESS_TOKEN], "homes": homes}
await self.async_set_unique_id(unique_id=str(user_id))
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=user_name,
data=data,
description=f"{user_login} ({user_id})",
description_placeholders={
"user_id": user_id,
"user_name": user_name,
"user_login": user_login,
},
)
# Store viewer data in the flow for use in the next step
self._viewer = viewer
self._access_token = user_input[CONF_ACCESS_TOKEN]
self._user_name = user_name
self._user_login = user_login
self._user_id = user_id
# Move to home selection step
return await self.async_step_select_home()
return self.async_show_form(
step_id="user",
@ -142,6 +145,74 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
errors=_errors,
)
async def async_step_select_home(self, user_input: dict | None = None) -> ConfigFlowResult:
"""Handle home selection during initial setup."""
homes = self._viewer.get("homes", []) if self._viewer else []
if not homes:
return self.async_abort(reason="unknown")
if user_input is not None:
selected_home_id = user_input["home_id"]
selected_home = next((home for home in homes if home["id"] == selected_home_id), None)
if not selected_home:
return self.async_abort(reason="unknown")
data = {
CONF_ACCESS_TOKEN: self._access_token or "",
"home_id": selected_home_id,
"home_data": selected_home,
"homes": homes,
}
return self.async_create_entry(
title=self._user_name or "Unknown User",
data=data,
description=f"{self._user_login} ({self._user_id})",
)
home_options = [
SelectOptionDict(
value=home["id"],
label=self._get_home_title(home),
)
for home in homes
]
return self.async_show_form(
step_id="select_home",
data_schema=vol.Schema(
{
vol.Required("home_id"): SelectSelector(
SelectSelectorConfig(
options=home_options,
mode=SelectSelectorMode.DROPDOWN,
)
)
}
),
)
@staticmethod
def _get_home_title(home: dict) -> str:
"""Generate a user-friendly title for a home."""
title = home.get("appNickname")
if title:
return title
address = home.get("address", {})
if address:
parts = []
if address.get("address1"):
parts.append(address["address1"])
if address.get("city"):
parts.append(address["city"])
if parts:
return ", ".join(parts)
return home.get("id", "Unknown Home")
async def _get_viewer_details(self, access_token: str) -> dict:
"""Validate credentials and return information about the account (viewer object)."""
client = TibberPricesApiClient(
@ -249,8 +320,6 @@ class TibberPricesSubentryFlowHandler(ConfigSubentryFlow):
if not available_homes:
return self.async_abort(reason="no_available_homes")
from homeassistant.helpers.selector import SelectOptionDict
home_options = [
SelectOptionDict(
value=home["id"],

View file

@ -73,9 +73,15 @@ LOGGER = logging.getLogger(__package__)
# Path to custom translations directory
CUSTOM_TRANSLATIONS_DIR = Path(__file__).parent / "custom_translations"
# Path to standard translations directory
TRANSLATIONS_DIR = Path(__file__).parent / "translations"
# Cache for translations to avoid repeated file reads
_TRANSLATIONS_CACHE: dict[str, dict] = {}
# Cache for standard translations (config flow, home_types, etc.)
_STANDARD_TRANSLATIONS_CACHE: dict[str, dict] = {}
async def async_load_translations(hass: HomeAssistant, language: str) -> dict:
"""
@ -139,6 +145,67 @@ async def async_load_translations(hass: HomeAssistant, language: str) -> dict:
return empty_cache
async def async_load_standard_translations(hass: HomeAssistant, language: str) -> dict:
"""
Load standard translations from the translations directory asynchronously.
These are the config flow and home_types translations used in the UI.
Args:
hass: HomeAssistant instance
language: The language code to load
Returns:
The loaded translations as a dictionary
"""
cache_key = f"{DOMAIN}_standard_translations_{language}"
# Check if we have an instance in hass.data
if cache_key in hass.data:
return hass.data[cache_key]
# Check the module-level cache
if language in _STANDARD_TRANSLATIONS_CACHE:
return _STANDARD_TRANSLATIONS_CACHE[language]
# Determine the file path
file_path = TRANSLATIONS_DIR / f"{language}.json"
if not file_path.exists():
# Fall back to English if requested language not found
file_path = TRANSLATIONS_DIR / "en.json"
if not file_path.exists():
LOGGER.debug("No standard translations found at %s", file_path)
empty_cache = {}
_STANDARD_TRANSLATIONS_CACHE[language] = empty_cache
hass.data[cache_key] = empty_cache
return empty_cache
try:
# Read the file asynchronously
async with aiofiles.open(file_path, encoding="utf-8") as f:
content = await f.read()
translations = json.loads(content)
# Store in both caches for future calls
_STANDARD_TRANSLATIONS_CACHE[language] = translations
hass.data[cache_key] = translations
return translations
except (OSError, json.JSONDecodeError) as err:
LOGGER.warning("Error loading standard translations file: %s", err)
empty_cache = {}
_STANDARD_TRANSLATIONS_CACHE[language] = empty_cache
hass.data[cache_key] = empty_cache
return empty_cache
except Exception: # pylint: disable=broad-except
LOGGER.exception("Unexpected error loading standard translations")
empty_cache = {}
_STANDARD_TRANSLATIONS_CACHE[language] = empty_cache
hass.data[cache_key] = empty_cache
return empty_cache
async def async_get_translation(
hass: HomeAssistant,
path: Sequence[str],
@ -147,6 +214,8 @@ async def async_get_translation(
"""
Get a translation value by path asynchronously.
Checks standard translations first, then custom translations.
Args:
hass: HomeAssistant instance
path: A sequence of keys defining the path to the translation value
@ -156,6 +225,20 @@ async def async_get_translation(
The translation value if found, None otherwise
"""
# Try standard translations first (config flow, home_types, etc.)
translations = await async_load_standard_translations(hass, language)
# Navigate to the requested path
current = translations
for key in path:
if not isinstance(current, dict) or key not in current:
break
current = current.get(key)
else:
# If we successfully navigated to the end, return the value
return current
# Fall back to custom translations if not found in standard translations
translations = await async_load_translations(hass, language)
# Navigate to the requested path
@ -176,6 +259,7 @@ def get_translation(
Get a translation value by path synchronously from the cache.
This function only accesses the cached translations to avoid blocking I/O.
Checks standard translations first, then custom translations.
Args:
path: A sequence of keys defining the path to the translation value
@ -185,26 +269,42 @@ def get_translation(
The translation value if found in cache, None otherwise
"""
# Only return from cache to avoid blocking I/O
if language not in _TRANSLATIONS_CACHE:
# Fall back to English if the requested language is not available
if language != "en" and "en" in _TRANSLATIONS_CACHE:
language = "en"
else:
return None
# Navigate to the requested path
current = _TRANSLATIONS_CACHE[language]
for key in path:
if not isinstance(current, dict):
return None
if key not in current:
# Log the missing key for debugging
LOGGER.debug("Translation key '%s' not found in path %s for language %s", key, path, language)
return None
current = current[key]
def _navigate_dict(d: dict, keys: Sequence[str]) -> Any:
"""Navigate through nested dict following the keys path."""
current = d
for key in keys:
if not isinstance(current, dict) or key not in current:
return None
current = current[key]
return current
return current
def _get_from_cache(cache: dict[str, dict], lang: str) -> Any:
"""Get translation from cache with fallback to English."""
if lang in cache:
result = _navigate_dict(cache[lang], path)
if result is not None:
return result
# Fallback to English if not found in requested language
if lang != "en" and "en" in cache:
result = _navigate_dict(cache["en"], path)
if result is not None:
return result
return None
# Try standard translations first
result = _get_from_cache(_STANDARD_TRANSLATIONS_CACHE, language)
if result is not None:
return result
# Fall back to custom translations
result = _get_from_cache(_TRANSLATIONS_CACHE, language)
if result is not None:
return result
# Log the missing key for debugging
LOGGER.debug("Translation key '%s' not found for language %s", path, language)
return None
# Convenience functions for backward compatibility and common usage patterns
@ -314,3 +414,57 @@ def get_price_level_translation(
"""
return get_translation(["sensor", "price_level", "price_levels", level], language)
async def async_get_home_type_translation(
hass: HomeAssistant,
home_type: str,
language: str = "en",
) -> str | None:
"""
Get a localized translation for a home type asynchronously.
Args:
hass: HomeAssistant instance
home_type: The home type (e.g., APARTMENT, HOUSE, etc.)
language: The language code (defaults to English)
Returns:
The localized home type if found, None otherwise
"""
return await async_get_translation(hass, ["home_types", home_type], language)
def get_home_type_translation(
home_type: str,
language: str = "en",
) -> str | None:
"""
Get a localized translation for a home type synchronously from the cache.
This function only accesses the cached translations to avoid blocking I/O.
Args:
home_type: The home type (e.g., APARTMENT, HOUSE, etc.)
language: The language code (defaults to English)
Returns:
The localized home type if found in cache, fallback to HOME_TYPES dict, or None
"""
translated = get_translation(["home_types", home_type], language)
if translated:
LOGGER.debug("Found translation for home type '%s' in language '%s': %s", home_type, language, translated)
return translated
fallback = HOME_TYPES.get(home_type)
LOGGER.debug(
"No translation found for home type '%s' in language '%s', using fallback: %s. "
"Available caches: standard=%s, custom=%s",
home_type,
language,
fallback,
list(_STANDARD_TRANSLATIONS_CACHE.keys()),
list(_TRANSLATIONS_CACHE.keys()),
)
return fallback

View file

@ -5,7 +5,7 @@ from __future__ import annotations
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import ATTRIBUTION, DOMAIN
from .const import ATTRIBUTION, DOMAIN, get_home_type_translation
from .coordinator import TibberPricesDataUpdateCoordinator
@ -19,14 +19,6 @@ class TibberPricesEntity(CoordinatorEntity[TibberPricesDataUpdateCoordinator]):
"""Initialize."""
super().__init__(coordinator)
# enum of home types
home_types = {
"APARTMENT": "Apartment",
"ROWHOUSE": "Rowhouse",
"HOUSE": "House",
"COTTAGE": "Cottage",
}
# Get user profile information from coordinator
user_profile = self.coordinator.get_user_profile()
@ -79,12 +71,16 @@ class TibberPricesEntity(CoordinatorEntity[TibberPricesDataUpdateCoordinator]):
except (KeyError, IndexError, TypeError):
home_name = "Tibber Home"
# Get translated home type using the configured language
language = coordinator.hass.config.language or "en"
translated_model = get_home_type_translation(home_type, language) if home_type else "Unknown"
self._attr_device_info = DeviceInfo(
entry_type=DeviceEntryType.SERVICE,
identifiers={(DOMAIN, coordinator.config_entry.unique_id or coordinator.config_entry.entry_id)},
name=home_name,
manufacturer="Tibber",
model=home_types.get(home_type, "Unknown") if home_type else "Unknown",
model=translated_model,
model_id=home_type if home_type else None,
serial_number=home_id if home_id else None,
)

View file

@ -9,6 +9,14 @@
"title": "Tibber Preisinformationen & Bewertungen",
"submit": "Token validieren"
},
"select_home": {
"description": "Wähle ein Zuhause, um Preisinformationen und Bewertungen abzurufen.",
"data": {
"home_id": "Zuhause"
},
"title": "Wähle ein Zuhause",
"submit": "Zuhause auswählen"
},
"finish": {
"description": "Wähle ein Zuhause, um Preisinformationen und Bewertungen abzurufen.",
"data": {
@ -33,7 +41,7 @@
},
"config_subentries": {
"home": {
"title": "Zuhause",
"title": "Zuhause hinzufügen",
"step": {
"user": {
"title": "Tibber Zuhause hinzufügen",
@ -134,6 +142,12 @@
}
}
},
"home_types": {
"APARTMENT": "Wohnung",
"ROWHOUSE": "Reihenhaus",
"HOUSE": "Haus",
"COTTAGE": "Ferienhaus"
},
"issues": {
"new_homes_available": {
"title": "Neue Tibber-Häuser erkannt",

View file

@ -9,6 +9,14 @@
"title": "Tibber Price Information & Ratings",
"submit": "Validate Token"
},
"select_home": {
"description": "Select a home to fetch price information and ratings.",
"data": {
"home_id": "Home"
},
"title": "Pick a Home",
"submit": "Select Home"
},
"finish": {
"description": "Select a home to fetch price information and ratings.",
"data": {
@ -149,6 +157,12 @@
}
}
},
"home_types": {
"APARTMENT": "Apartment",
"ROWHOUSE": "Rowhouse",
"HOUSE": "House",
"COTTAGE": "Cottage"
},
"issues": {
"new_homes_available": {
"title": "New Tibber homes detected",

View file

@ -34,10 +34,13 @@ class TestBasicCoordinator:
@pytest.fixture
def coordinator(self, mock_hass, mock_config_entry, mock_session):
"""Create a coordinator instance."""
with patch(
"custom_components.tibber_prices.coordinator.aiohttp_client.async_get_clientsession",
return_value=mock_session,
), patch("custom_components.tibber_prices.coordinator.Store") as mock_store_class:
with (
patch(
"custom_components.tibber_prices.coordinator.aiohttp_client.async_get_clientsession",
return_value=mock_session,
),
patch("custom_components.tibber_prices.coordinator.Store") as mock_store_class,
):
mock_store = Mock()
mock_store.async_load = AsyncMock(return_value=None)
mock_store.async_save = AsyncMock()