mirror of
https://github.com/jpawlowski/hass.tibber_prices.git
synced 2026-03-30 21:33:39 +00:00
Renamed 25 public classes + 1 Enum to include TibberPrices prefix
following Home Assistant integration naming standards.
All classes now follow pattern: TibberPrices{SemanticPurpose}
No package hierarchy in names (import path is namespace).
Key changes:
- Coordinator module: DataFetcher, DataTransformer, ListenerManager,
PeriodCalculator, TimeService (203 usages), CacheData
- Config flow: CannotConnectError, InvalidAuthError
- Entity utils: IconContext
- Sensor calculators: BaseCalculator + 8 subclasses
- Period handlers: 5 NamedTuples (PeriodConfig, PeriodData,
PeriodStatistics, ThresholdConfig, IntervalCriteria)
- Period handlers: SpikeCandidateContext (dataclass → NamedTuple)
- API: QueryType Enum
Documentation updates:
- AGENTS.md: Added Pyright code generation guidelines
- planning/class-naming-refactoring-plan.md: Complete execution log
Quality metrics:
- 0 Pyright errors (strict type checking)
- 0 Ruff errors (linting + formatting)
- All hassfest checks passed
- 79 files validated
Impact: Aligns with HA Core standards (TibberDataCoordinator pattern).
No user-facing changes - internal refactor only.
139 lines
5 KiB
Python
139 lines
5 KiB
Python
"""Volatility attribute builders for Tibber Prices sensors."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
from typing import TYPE_CHECKING
|
|
|
|
from custom_components.tibber_prices.utils.price import calculate_volatility_level
|
|
|
|
if TYPE_CHECKING:
|
|
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
|
|
|
|
|
|
def add_volatility_attributes(
|
|
attributes: dict,
|
|
cached_data: dict,
|
|
*,
|
|
time: TibberPricesTimeService, # noqa: ARG001
|
|
) -> None:
|
|
"""
|
|
Add attributes for volatility sensors.
|
|
|
|
Args:
|
|
attributes: Dictionary to add attributes to
|
|
cached_data: Dictionary containing cached sensor data
|
|
time: TibberPricesTimeService instance (required)
|
|
|
|
"""
|
|
if cached_data.get("volatility_attributes"):
|
|
attributes.update(cached_data["volatility_attributes"])
|
|
|
|
|
|
def get_prices_for_volatility(
|
|
volatility_type: str,
|
|
price_info: dict,
|
|
*,
|
|
time: TibberPricesTimeService,
|
|
) -> list[float]:
|
|
"""
|
|
Get price list for volatility calculation based on type.
|
|
|
|
Args:
|
|
volatility_type: One of "today", "tomorrow", "next_24h", "today_tomorrow"
|
|
price_info: Price information dictionary from coordinator data
|
|
time: TibberPricesTimeService instance (required)
|
|
|
|
Returns:
|
|
List of prices to analyze
|
|
|
|
"""
|
|
if volatility_type == "today":
|
|
return [float(p["total"]) for p in price_info.get("today", []) if "total" in p]
|
|
|
|
if volatility_type == "tomorrow":
|
|
return [float(p["total"]) for p in price_info.get("tomorrow", []) if "total" in p]
|
|
|
|
if volatility_type == "next_24h":
|
|
# Rolling 24h from now
|
|
now = time.now()
|
|
end_time = now + timedelta(hours=24)
|
|
prices = []
|
|
|
|
for day_key in ["today", "tomorrow"]:
|
|
for price_data in price_info.get(day_key, []):
|
|
starts_at = price_data.get("startsAt") # Already datetime in local timezone
|
|
if starts_at is None:
|
|
continue
|
|
|
|
if time.is_in_future(starts_at) and starts_at < end_time and "total" in price_data:
|
|
prices.append(float(price_data["total"]))
|
|
return prices
|
|
|
|
if volatility_type == "today_tomorrow":
|
|
# Combined today + tomorrow
|
|
prices = []
|
|
for day_key in ["today", "tomorrow"]:
|
|
for price_data in price_info.get(day_key, []):
|
|
if "total" in price_data:
|
|
prices.append(float(price_data["total"]))
|
|
return prices
|
|
|
|
return []
|
|
|
|
|
|
def add_volatility_type_attributes(
|
|
volatility_attributes: dict,
|
|
volatility_type: str,
|
|
price_info: dict,
|
|
thresholds: dict,
|
|
*,
|
|
time: TibberPricesTimeService,
|
|
) -> None:
|
|
"""
|
|
Add type-specific attributes for volatility sensors.
|
|
|
|
Args:
|
|
volatility_attributes: Dictionary to add type-specific attributes to
|
|
volatility_type: Type of volatility calculation
|
|
price_info: Price information dictionary from coordinator data
|
|
thresholds: Volatility thresholds configuration
|
|
time: TibberPricesTimeService instance (required)
|
|
|
|
"""
|
|
# Add timestamp for calendar day volatility sensors (midnight of the day)
|
|
if volatility_type == "today":
|
|
today_data = price_info.get("today", [])
|
|
if today_data:
|
|
volatility_attributes["timestamp"] = today_data[0].get("startsAt")
|
|
elif volatility_type == "tomorrow":
|
|
tomorrow_data = price_info.get("tomorrow", [])
|
|
if tomorrow_data:
|
|
volatility_attributes["timestamp"] = tomorrow_data[0].get("startsAt")
|
|
elif volatility_type == "today_tomorrow":
|
|
# For combined today+tomorrow, use today's midnight
|
|
today_data = price_info.get("today", [])
|
|
if today_data:
|
|
volatility_attributes["timestamp"] = today_data[0].get("startsAt")
|
|
|
|
# Add breakdown for today vs tomorrow
|
|
today_prices = [float(p["total"]) for p in price_info.get("today", []) if "total" in p]
|
|
tomorrow_prices = [float(p["total"]) for p in price_info.get("tomorrow", []) if "total" in p]
|
|
|
|
if today_prices:
|
|
today_vol = calculate_volatility_level(today_prices, **thresholds)
|
|
today_spread = (max(today_prices) - min(today_prices)) * 100
|
|
volatility_attributes["today_spread"] = round(today_spread, 2)
|
|
volatility_attributes["today_volatility"] = today_vol
|
|
volatility_attributes["interval_count_today"] = len(today_prices)
|
|
|
|
if tomorrow_prices:
|
|
tomorrow_vol = calculate_volatility_level(tomorrow_prices, **thresholds)
|
|
tomorrow_spread = (max(tomorrow_prices) - min(tomorrow_prices)) * 100
|
|
volatility_attributes["tomorrow_spread"] = round(tomorrow_spread, 2)
|
|
volatility_attributes["tomorrow_volatility"] = tomorrow_vol
|
|
volatility_attributes["interval_count_tomorrow"] = len(tomorrow_prices)
|
|
elif volatility_type == "next_24h":
|
|
# Add time window info
|
|
now = time.now()
|
|
volatility_attributes["timestamp"] = now
|