This commit is contained in:
Julian Pawlowski 2025-05-24 20:50:17 +00:00
parent b5c278920c
commit bd33fc7367
2 changed files with 185 additions and 121 deletions

View file

@ -2,12 +2,34 @@
from __future__ import annotations from __future__ import annotations
from typing import Any
import voluptuous as vol import voluptuous as vol
from homeassistant.config_entries import ConfigEntry, ConfigFlow, ConfigFlowResult, OptionsFlow from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
ConfigSubentry,
ConfigSubentryFlow,
OptionsFlow,
SubentryFlowResult,
)
from homeassistant.const import CONF_ACCESS_TOKEN from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.helpers import selector from homeassistant.core import callback
from homeassistant.helpers.aiohttp_client import async_create_clientsession from homeassistant.helpers.aiohttp_client import async_create_clientsession
from homeassistant.helpers.selector import (
BooleanSelector,
NumberSelector,
NumberSelectorConfig,
NumberSelectorMode,
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .api import ( from .api import (
TibberPricesApiClient, TibberPricesApiClient,
@ -31,19 +53,18 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
"""Config flow for tibber_prices.""" """Config flow for tibber_prices."""
VERSION = 1 VERSION = 1
MINOR_VERSION = 0
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize the config flow.""" """Initialize the config flow."""
super().__init__() super().__init__()
self._reauth_entry: ConfigEntry | None = None self._reauth_entry: ConfigEntry | None = None
self._pending_user_input: dict | None = None
@staticmethod @classmethod
def async_get_options_flow( @callback
config_entry: ConfigEntry, def async_get_supported_subentry_types(cls, config_entry: ConfigEntry) -> dict[str, type[ConfigSubentryFlow]]: # noqa: ARG003
) -> OptionsFlow: """Return subentries supported by this integration."""
"""Get the options flow for this handler.""" return {"home": TibberPricesSubentryFlowHandler}
return TibberPricesOptionsFlowHandler(config_entry)
@staticmethod @staticmethod
def async_get_reauth_flow(entry: ConfigEntry) -> ConfigFlow: def async_get_reauth_flow(entry: ConfigEntry) -> ConfigFlow:
@ -73,12 +94,36 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
LOGGER.exception(exception) LOGGER.exception(exception)
_errors["base"] = "unknown" _errors["base"] = "unknown"
else: else:
# Store viewer for use in finish step user_id = viewer.get("userId", None)
self._pending_user_input = { user_name = viewer.get("name") or user_id or "Unknown User"
"access_token": user_input[CONF_ACCESS_TOKEN], user_login = viewer.get("login", "N/A")
"viewer": viewer, homes = viewer.get("homes", [])
}
return await self.async_step_finish() if not user_id:
LOGGER.error("No user ID found: %s", viewer)
return self.async_abort(reason="unknown")
if not homes:
LOGGER.error("No homes found: %s", viewer)
return self.async_abort(reason="unknown")
LOGGER.debug("Viewer data received: %s", viewer)
data = {CONF_ACCESS_TOKEN: user_input[CONF_ACCESS_TOKEN], "homes": homes}
await self.async_set_unique_id(unique_id=str(user_id))
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=user_name,
data=data,
description=f"{user_login} ({user_id})",
description_placeholders={
"user_id": user_id,
"user_name": user_name,
"user_login": user_login,
},
)
return self.async_show_form( return self.async_show_form(
step_id="user", step_id="user",
@ -87,9 +132,9 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
vol.Required( vol.Required(
CONF_ACCESS_TOKEN, CONF_ACCESS_TOKEN,
default=(user_input or {}).get(CONF_ACCESS_TOKEN, vol.UNDEFINED), default=(user_input or {}).get(CONF_ACCESS_TOKEN, vol.UNDEFINED),
): selector.TextSelector( ): TextSelector(
selector.TextSelectorConfig( TextSelectorConfig(
type=selector.TextSelectorType.TEXT, type=TextSelectorType.TEXT,
), ),
), ),
}, },
@ -97,57 +142,6 @@ class TibberPricesFlowHandler(ConfigFlow, domain=DOMAIN):
errors=_errors, errors=_errors,
) )
async def async_step_finish(self, user_input: dict | None = None) -> ConfigFlowResult:
"""Show a finish screen after successful setup, then create entry on submit."""
if self._pending_user_input is not None and user_input is None:
# First visit: show home selection
viewer = self._pending_user_input["viewer"]
homes = viewer.get("homes", [])
# Build choices: label = address or nickname, value = id
home_choices = {}
for home in homes:
label = home.get("appNickname") or home.get("address", {}).get("address1") or home["id"]
if home.get("address", {}).get("city"):
label += f", {home['address']['city']}"
home_choices[home["id"]] = label
schema = vol.Schema({vol.Required("home_id"): vol.In(home_choices)})
return self.async_show_form(
step_id="finish",
data_schema=schema,
description_placeholders={},
errors={},
last_step=True,
)
if self._pending_user_input is not None and user_input is not None:
# User selected home, create entry
home_id = user_input["home_id"]
viewer = self._pending_user_input["viewer"]
# Use the same label as shown to the user for the config entry title
home_label = None
for home in viewer.get("homes", []):
if home["id"] == home_id:
home_label = home.get("appNickname") or home.get("address", {}).get("address1") or home_id
if home.get("address", {}).get("city"):
home_label += f", {home['address']['city']}"
break
if not home_label:
home_label = viewer.get("name", "Tibber")
data = {
CONF_ACCESS_TOKEN: self._pending_user_input["access_token"],
CONF_EXTENDED_DESCRIPTIONS: DEFAULT_EXTENDED_DESCRIPTIONS,
CONF_BEST_PRICE_FLEX: DEFAULT_BEST_PRICE_FLEX,
CONF_PEAK_PRICE_FLEX: DEFAULT_PEAK_PRICE_FLEX,
}
self._pending_user_input = None
# Set unique_id to home_id
await self.async_set_unique_id(unique_id=home_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=home_label,
data=data,
)
return self.async_abort(reason="setup_complete")
async def _get_viewer_details(self, access_token: str) -> dict: async def _get_viewer_details(self, access_token: str) -> dict:
"""Validate credentials and return information about the account (viewer object).""" """Validate credentials and return information about the account (viewer object)."""
client = TibberPricesApiClient( client = TibberPricesApiClient(
@ -165,13 +159,12 @@ class TibberPricesReauthFlowHandler(ConfigFlow):
"""Initialize the reauth flow handler.""" """Initialize the reauth flow handler."""
self._entry = entry self._entry = entry
self._errors: dict[str, str] = {} self._errors: dict[str, str] = {}
self._pending_user_input: dict | None = None
async def async_step_user(self, user_input: dict | None = None) -> ConfigFlowResult: async def async_step_user(self, user_input: dict | None = None) -> ConfigFlowResult:
"""Prompt for a new access token, then go to finish for home selection.""" """Prompt for a new access token."""
if user_input is not None: if user_input is not None:
try: try:
viewer = await TibberPricesApiClient( await TibberPricesApiClient(
access_token=user_input[CONF_ACCESS_TOKEN], access_token=user_input[CONF_ACCESS_TOKEN],
session=async_create_clientsession(self.hass), session=async_create_clientsession(self.hass),
).async_get_viewer_details() ).async_get_viewer_details()
@ -185,63 +178,123 @@ class TibberPricesReauthFlowHandler(ConfigFlow):
LOGGER.exception(exception) LOGGER.exception(exception)
self._errors["base"] = "unknown" self._errors["base"] = "unknown"
else: else:
self._pending_user_input = { self.hass.config_entries.async_update_entry(
"access_token": user_input[CONF_ACCESS_TOKEN], self._entry,
"viewer": viewer.get("viewer", viewer), data={
} **self._entry.data,
return await self.async_step_finish() CONF_ACCESS_TOKEN: user_input[CONF_ACCESS_TOKEN],
},
)
return self.async_abort(reason="reauth_successful")
return self.async_show_form( return self.async_show_form(
step_id="user", step_id="user",
data_schema=vol.Schema( data_schema=vol.Schema(
{ {
vol.Required(CONF_ACCESS_TOKEN): selector.TextSelector( vol.Required(CONF_ACCESS_TOKEN): TextSelector(TextSelectorConfig(type=TextSelectorType.TEXT)),
selector.TextSelectorConfig(type=selector.TextSelectorType.TEXT)
),
} }
), ),
errors=self._errors, errors=self._errors,
) )
async def async_step_finish(self, user_input: dict | None = None) -> ConfigFlowResult:
"""Show home selection, then update config entry.""" class TibberPricesSubentryFlowHandler(ConfigSubentryFlow):
if self._pending_user_input is not None and user_input is None: """Handle subentry flows for tibber_prices."""
viewer = self._pending_user_input["viewer"]
homes = viewer.get("homes", []) async def async_step_user(self, user_input: dict[str, Any] | None = None) -> SubentryFlowResult:
home_choices = {} """User flow to add a new home."""
for home in homes: parent_entry = self._get_entry()
label = home.get("appNickname") or home.get("address", {}).get("address1") or home["id"] if not parent_entry:
if home.get("address", {}).get("city"): return self.async_abort(reason="no_parent_entry")
label += f", {home['address']['city']}"
home_choices[home["id"]] = label homes = parent_entry.data.get("homes", [])
schema = vol.Schema({vol.Required("home_id"): vol.In(home_choices)}) if not homes:
return self.async_abort(reason="no_available_homes")
if user_input is not None:
selected_home_id = user_input["home_id"]
selected_home = next((home for home in homes if home["id"] == selected_home_id), None)
if not selected_home:
return self.async_abort(reason="home_not_found")
home_title = self._get_home_title(selected_home)
home_id = selected_home["id"]
return self.async_create_entry(
title=home_title,
data={
"home_id": home_id,
"home_data": selected_home,
},
description=f"Subentry for {home_title}",
description_placeholders={"home_id": home_id},
unique_id=home_id,
)
# Get existing home IDs by checking all subentries for this parent
existing_home_ids = set()
for entry in self.hass.config_entries.async_entries(DOMAIN):
# Check if this entry has home_id data (indicating it's a subentry)
if entry.data.get("home_id") and entry != parent_entry:
existing_home_ids.add(entry.data["home_id"])
available_homes = [home for home in homes if home["id"] not in existing_home_ids]
if not available_homes:
return self.async_abort(reason="no_available_homes")
from homeassistant.helpers.selector import SelectOptionDict
home_options = [
SelectOptionDict(
value=home["id"],
label=self._get_home_title(home),
)
for home in available_homes
]
schema = vol.Schema(
{
vol.Required("home_id"): SelectSelector(
SelectSelectorConfig(
options=home_options,
mode=SelectSelectorMode.DROPDOWN,
)
)
}
)
return self.async_show_form( return self.async_show_form(
step_id="finish", step_id="user",
data_schema=schema, data_schema=schema,
description_placeholders={}, description_placeholders={},
errors={}, errors={},
last_step=True,
) )
if self._pending_user_input is not None and user_input is not None:
home_id = user_input["home_id"] def _get_home_title(self, home: dict) -> str:
# Update the config entry with new token and home_id """Generate a user-friendly title for a home."""
self.hass.config_entries.async_update_entry( title = home.get("appNickname")
self._entry, if title:
data={ return title
**self._entry.data,
CONF_ACCESS_TOKEN: self._pending_user_input["access_token"], address = home.get("address", {})
"home_id": home_id, if address:
}, parts = []
) if address.get("address1"):
self._pending_user_input = None parts.append(address["address1"])
return self.async_abort(reason="reauth_successful") if address.get("city"):
return self.async_abort(reason="setup_complete") parts.append(address["city"])
if parts:
return ", ".join(parts)
return home.get("id", "Unknown Home")
class TibberPricesOptionsFlowHandler(OptionsFlow): class TibberPricesOptionsSubentryFlowHandler(OptionsFlow):
"""Tibber Prices config flow options handler.""" """Tibber Prices config flow options handler."""
def __init__(self, config_entry: ConfigEntry) -> None: # noqa: ARG002 def __init__(self, config_entry: ConfigSubentry) -> None: # noqa: ARG002
"""Initialize options flow.""" """Initialize options flow."""
super().__init__() super().__init__()
@ -249,7 +302,6 @@ class TibberPricesOptionsFlowHandler(OptionsFlow):
"""Manage the options.""" """Manage the options."""
errors: dict[str, str] = {} errors: dict[str, str] = {}
# Build options schema
options = { options = {
vol.Optional( vol.Optional(
CONF_EXTENDED_DESCRIPTIONS, CONF_EXTENDED_DESCRIPTIONS,
@ -257,7 +309,7 @@ class TibberPricesOptionsFlowHandler(OptionsFlow):
CONF_EXTENDED_DESCRIPTIONS, CONF_EXTENDED_DESCRIPTIONS,
self.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS), self.config_entry.data.get(CONF_EXTENDED_DESCRIPTIONS, DEFAULT_EXTENDED_DESCRIPTIONS),
), ),
): selector.BooleanSelector(), ): BooleanSelector(),
vol.Optional( vol.Optional(
CONF_BEST_PRICE_FLEX, CONF_BEST_PRICE_FLEX,
default=int( default=int(
@ -266,12 +318,12 @@ class TibberPricesOptionsFlowHandler(OptionsFlow):
self.config_entry.data.get(CONF_BEST_PRICE_FLEX, DEFAULT_BEST_PRICE_FLEX), self.config_entry.data.get(CONF_BEST_PRICE_FLEX, DEFAULT_BEST_PRICE_FLEX),
) )
), ),
): selector.NumberSelector( ): NumberSelector(
selector.NumberSelectorConfig( NumberSelectorConfig(
min=0, min=0,
max=100, max=100,
step=1, step=1,
mode=selector.NumberSelectorMode.SLIDER, mode=NumberSelectorMode.SLIDER,
), ),
), ),
vol.Optional( vol.Optional(
@ -282,12 +334,12 @@ class TibberPricesOptionsFlowHandler(OptionsFlow):
self.config_entry.data.get(CONF_PEAK_PRICE_FLEX, DEFAULT_PEAK_PRICE_FLEX), self.config_entry.data.get(CONF_PEAK_PRICE_FLEX, DEFAULT_PEAK_PRICE_FLEX),
) )
), ),
): selector.NumberSelector( ): NumberSelector(
selector.NumberSelectorConfig( NumberSelectorConfig(
min=0, min=0,
max=100, max=100,
step=1, step=1,
mode=selector.NumberSelectorMode.SLIDER, mode=NumberSelectorMode.SLIDER,
), ),
), ),
} }
@ -295,7 +347,6 @@ class TibberPricesOptionsFlowHandler(OptionsFlow):
if user_input is not None: if user_input is not None:
return self.async_create_entry(title="", data=user_input) return self.async_create_entry(title="", data=user_input)
# Prepare read-only info for description placeholders
description_placeholders = { description_placeholders = {
"unique_id": self.config_entry.unique_id or "", "unique_id": self.config_entry.unique_id or "",
} }

View file

@ -14,7 +14,6 @@ from homeassistant.core import HomeAssistant
VERSION = "0.1.0" VERSION = "0.1.0"
DOMAIN = "tibber_prices" DOMAIN = "tibber_prices"
CONF_ACCESS_TOKEN = "access_token" # noqa: S105
CONF_EXTENDED_DESCRIPTIONS = "extended_descriptions" CONF_EXTENDED_DESCRIPTIONS = "extended_descriptions"
CONF_BEST_PRICE_FLEX = "best_price_flex" CONF_BEST_PRICE_FLEX = "best_price_flex"
CONF_PEAK_PRICE_FLEX = "peak_price_flex" CONF_PEAK_PRICE_FLEX = "peak_price_flex"
@ -27,6 +26,20 @@ DEFAULT_EXTENDED_DESCRIPTIONS = False
DEFAULT_BEST_PRICE_FLEX = 5 # 5% flexibility for best price (user-facing, percent) DEFAULT_BEST_PRICE_FLEX = 5 # 5% flexibility for best price (user-facing, percent)
DEFAULT_PEAK_PRICE_FLEX = 5 # 5% flexibility for peak price (user-facing, percent) DEFAULT_PEAK_PRICE_FLEX = 5 # 5% flexibility for peak price (user-facing, percent)
# Home types
HOME_TYPE_APARTMENT = "APARTMENT"
HOME_TYPE_ROWHOUSE = "ROWHOUSE"
HOME_TYPE_HOUSE = "HOUSE"
HOME_TYPE_COTTAGE = "COTTAGE"
# Mapping for home types to their localized names
HOME_TYPES = {
HOME_TYPE_APARTMENT: "Apartment",
HOME_TYPE_ROWHOUSE: "Rowhouse",
HOME_TYPE_HOUSE: "House",
HOME_TYPE_COTTAGE: "Cottage",
}
# Price level constants # Price level constants
PRICE_LEVEL_NORMAL = "NORMAL" PRICE_LEVEL_NORMAL = "NORMAL"
PRICE_LEVEL_CHEAP = "CHEAP" PRICE_LEVEL_CHEAP = "CHEAP"