fix linting errors

This commit is contained in:
Julian Pawlowski 2025-11-03 00:32:27 +00:00
parent d3c91e162a
commit 79556768cc
10 changed files with 314 additions and 233 deletions

View file

@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
import logging
import re
import socket
from datetime import timedelta
from enum import Enum
@ -752,8 +753,6 @@ class TibberPricesApiClient:
def _extract_retry_delay(self, error: Exception, retry: int) -> int:
"""Extract retry delay from rate limit error or use exponential backoff."""
import re
error_msg = str(error)
# Try to extract Retry-After value from error message

View file

@ -27,9 +27,13 @@ if TYPE_CHECKING:
from .const import (
CONF_BEST_PRICE_FLEX,
CONF_EXTENDED_DESCRIPTIONS,
CONF_PEAK_PRICE_FLEX,
DEFAULT_BEST_PRICE_FLEX,
DEFAULT_EXTENDED_DESCRIPTIONS,
DEFAULT_PEAK_PRICE_FLEX,
async_get_entity_description,
get_entity_description,
)
MINUTES_PER_INTERVAL = 15
@ -579,13 +583,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
# Get user's language preference
language = self.hass.config.language if self.hass.config.language else "en"
# Import async function to get descriptions
from .const import (
CONF_EXTENDED_DESCRIPTIONS,
DEFAULT_EXTENDED_DESCRIPTIONS,
async_get_entity_description,
)
# Add basic description
description = await async_get_entity_description(
self.hass, "binary_sensor", self.entity_description.translation_key, language, "description"
@ -650,13 +647,6 @@ class TibberPricesBinarySensor(TibberPricesEntity, BinarySensorEntity):
# Get user's language preference
language = self.hass.config.language if self.hass.config.language else "en"
# Import synchronous function to get cached descriptions
from .const import (
CONF_EXTENDED_DESCRIPTIONS,
DEFAULT_EXTENDED_DESCRIPTIONS,
get_entity_description,
)
# Add basic description from cache
description = get_entity_description(
"binary_sensor", self.entity_description.translation_key, language, "description"

View file

@ -291,7 +291,7 @@ async def _refresh_user_data(call: ServiceCall) -> dict[str, Any]:
# Get the entry and coordinator
try:
entry, coordinator, _ = _get_entry_and_data(hass, entry_id)
_, coordinator, _ = _get_entry_and_data(hass, entry_id)
except ServiceValidationError as ex:
return {
"success": False,
@ -449,7 +449,7 @@ def _determine_now_and_simulation(
return now, is_simulated
def _select_intervals(
def _select_intervals( # noqa: PLR0912
merged: list[dict], coordinator: Any, day: str, now: datetime, *, simulated: bool
) -> tuple[Any, Any, Any]:
"""Select previous, current, and next intervals for the given day and time."""

View file

@ -1 +1 @@
# Tests package
"""Tests package for Tibber Prices integration."""

View file

@ -11,14 +11,14 @@ class TestBasicCoordinator:
"""Test basic coordinator functionality."""
@pytest.fixture
def mock_hass(self):
def mock_hass(self) -> Mock:
"""Create a mock Home Assistant instance."""
hass = Mock()
hass.data = {}
return hass
@pytest.fixture
def mock_config_entry(self):
def mock_config_entry(self) -> Mock:
"""Create a mock config entry."""
config_entry = Mock()
config_entry.unique_id = "test_home_123"
@ -27,12 +27,14 @@ class TestBasicCoordinator:
return config_entry
@pytest.fixture
def mock_session(self):
def mock_session(self) -> Mock:
"""Create a mock session."""
return Mock()
@pytest.fixture
def coordinator(self, mock_hass, mock_config_entry, mock_session):
def coordinator(
self, mock_hass: Mock, mock_config_entry: Mock, mock_session: Mock
) -> TibberPricesDataUpdateCoordinator:
"""Create a coordinator instance."""
with (
patch(
@ -48,34 +50,34 @@ class TestBasicCoordinator:
return TibberPricesDataUpdateCoordinator(mock_hass, mock_config_entry)
def test_coordinator_creation(self, coordinator):
def test_coordinator_creation(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test that coordinator can be created."""
assert coordinator is not None
assert hasattr(coordinator, "get_current_interval_data")
assert hasattr(coordinator, "get_all_intervals")
assert hasattr(coordinator, "get_user_profile")
assert coordinator is not None # noqa: S101
assert hasattr(coordinator, "get_current_interval_data") # noqa: S101
assert hasattr(coordinator, "get_all_intervals") # noqa: S101
assert hasattr(coordinator, "get_user_profile") # noqa: S101
def test_is_main_entry(self, coordinator):
def test_is_main_entry(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test main entry detection."""
# First coordinator should be main entry
assert coordinator.is_main_entry() is True
assert coordinator.is_main_entry() is True # noqa: S101
def test_get_user_profile_no_data(self, coordinator):
def test_get_user_profile_no_data(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test getting user profile when no data is cached."""
profile = coordinator.get_user_profile()
assert profile == {"last_updated": None, "cached_user_data": False}
assert profile == {"last_updated": None, "cached_user_data": False} # noqa: S101
def test_get_user_homes_no_data(self, coordinator):
def test_get_user_homes_no_data(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test getting user homes when no data is cached."""
homes = coordinator.get_user_homes()
assert homes == []
assert homes == [] # noqa: S101
def test_get_current_interval_data_no_data(self, coordinator):
def test_get_current_interval_data_no_data(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test getting current interval data when no data is available."""
current_data = coordinator.get_current_interval_data()
assert current_data is None
assert current_data is None # noqa: S101
def test_get_all_intervals_no_data(self, coordinator):
def test_get_all_intervals_no_data(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test getting all intervals when no data is available."""
intervals = coordinator.get_all_intervals()
assert intervals == []
assert intervals == [] # noqa: S101

View file

@ -2,10 +2,12 @@
from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock, Mock, patch
import pytest
from custom_components.tibber_prices.api import TibberPricesApiClientCommunicationError
from custom_components.tibber_prices.const import DOMAIN
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
@ -25,8 +27,6 @@ class TestEnhancedCoordinator:
@pytest.fixture
def mock_hass(self) -> Mock:
"""Create a mock Home Assistant instance."""
import asyncio
hass = Mock()
hass.data = {}
# Mock the event loop for time tracking
@ -96,7 +96,7 @@ class TestEnhancedCoordinator:
)
# Verify main coordinator is marked as main entry
assert main_coordinator.is_main_entry()
assert main_coordinator.is_main_entry() # noqa: S101
# Create subentry coordinator
sub_config_entry = Mock()
@ -120,7 +120,7 @@ class TestEnhancedCoordinator:
)
# Verify subentry coordinator is not marked as main entry
assert not sub_coordinator.is_main_entry()
assert not sub_coordinator.is_main_entry() # noqa: S101
@pytest.mark.asyncio
async def test_user_data_functionality(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
@ -136,17 +136,17 @@ class TestEnhancedCoordinator:
# Test refresh user data
result = await coordinator.refresh_user_data()
assert result
assert result # noqa: S101
# Test get user profile
profile = coordinator.get_user_profile()
assert isinstance(profile, dict)
assert "last_updated" in profile
assert "cached_user_data" in profile
assert isinstance(profile, dict) # noqa: S101
assert "last_updated" in profile # noqa: S101
assert "cached_user_data" in profile # noqa: S101
# Test get user homes
homes = coordinator.get_user_homes()
assert isinstance(homes, list)
assert isinstance(homes, list) # noqa: S101
@pytest.mark.asyncio
async def test_data_update_with_multi_home_response(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
@ -185,19 +185,17 @@ class TestEnhancedCoordinator:
await coordinator.async_refresh()
# Verify coordinator has data
assert coordinator.data is not None
assert "priceInfo" in coordinator.data
assert "priceRating" in coordinator.data
assert coordinator.data is not None # noqa: S101
assert "priceInfo" in coordinator.data # noqa: S101
assert "priceRating" in coordinator.data # noqa: S101
# Test public API methods work
intervals = coordinator.get_all_intervals()
assert isinstance(intervals, list)
assert isinstance(intervals, list) # noqa: S101
@pytest.mark.asyncio
async def test_error_handling_with_cache_fallback(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
"""Test error handling with fallback to cached data."""
from custom_components.tibber_prices.api import TibberPricesApiClientCommunicationError
# Set up cached data using the store mechanism
test_cached_data = {
"timestamp": "2025-05-25T00:00:00Z",
@ -212,7 +210,7 @@ class TestEnhancedCoordinator:
}
# Mock store to return cached data
coordinator._store.async_load = AsyncMock(
coordinator._store.async_load = AsyncMock( # noqa: SLF001
return_value={
"price_data": test_cached_data,
"user_data": None,
@ -222,7 +220,7 @@ class TestEnhancedCoordinator:
)
# Load the cache
await coordinator._load_cache()
await coordinator._load_cache() # noqa: SLF001
# Mock API to raise communication error
coordinator.api.async_get_price_info = AsyncMock(
@ -233,7 +231,7 @@ class TestEnhancedCoordinator:
await coordinator.async_refresh()
# Verify coordinator has fallback data
assert coordinator.data is not None
assert coordinator.data is not None # noqa: S101
@pytest.mark.asyncio
async def test_cache_persistence(self, coordinator: TibberPricesDataUpdateCoordinator) -> None:
@ -252,4 +250,4 @@ class TestEnhancedCoordinator:
await coordinator.async_refresh()
# Verify data was cached (store should have been called)
coordinator._store.async_save.assert_called()
coordinator._store.async_save.assert_called() # noqa: SLF001

View file

@ -2,14 +2,21 @@
from __future__ import annotations
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from unittest.mock import Mock, patch
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
# Constants for test validation
INTERVALS_PER_DAY = 96
BASE_PRICE = 0.20
PRICE_INCREMENT = 0.001
def generate_price_intervals(
start_date: datetime,
num_intervals: int = 96,
base_price: float = 0.20,
num_intervals: int = INTERVALS_PER_DAY,
base_price: float = BASE_PRICE,
) -> list[dict]:
"""Generate realistic price intervals for a day."""
intervals = []
@ -19,7 +26,7 @@ def generate_price_intervals(
intervals.append(
{
"startsAt": current_time.isoformat(),
"total": base_price + (i * 0.001),
"total": base_price + (i * PRICE_INCREMENT),
"level": "NORMAL",
}
)
@ -30,23 +37,23 @@ def generate_price_intervals(
def test_midnight_turnover_with_stale_today_data() -> None:
"""Test midnight turnover when today's data is from the previous day."""
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator._perform_midnight_turnover = TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__(
coordinator._perform_midnight_turnover = ( # noqa: SLF001
TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__( # noqa: SLF001
coordinator, TibberPricesDataUpdateCoordinator
)
)
today_local = datetime(2025, 11, 2, 14, 30)
today_local = datetime(2025, 11, 2, 14, 30, tzinfo=UTC)
yesterday_prices = generate_price_intervals(
datetime(2025, 11, 1, 0, 0),
num_intervals=96,
datetime(2025, 11, 1, 0, 0, tzinfo=UTC),
num_intervals=INTERVALS_PER_DAY,
)
tomorrow_prices = generate_price_intervals(
datetime(2025, 11, 3, 0, 0),
num_intervals=96,
datetime(2025, 11, 3, 0, 0, tzinfo=UTC),
num_intervals=INTERVALS_PER_DAY,
)
price_info = {
@ -56,40 +63,40 @@ def test_midnight_turnover_with_stale_today_data() -> None:
}
with patch("custom_components.tibber_prices.coordinator.dt_util") as mock_dt_util:
mock_dt_util.as_local.side_effect = lambda dt: dt if dt else datetime(2025, 11, 2)
mock_dt_util.as_local.side_effect = lambda dt: (dt if dt else datetime(2025, 11, 2, tzinfo=UTC))
mock_dt_util.now.return_value = today_local
mock_dt_util.parse_datetime.side_effect = lambda s: (datetime.fromisoformat(s) if s else None)
rotated = coordinator._perform_midnight_turnover(price_info)
rotated = coordinator._perform_midnight_turnover(price_info) # noqa: SLF001
assert len(rotated["yesterday"]) == 96
assert rotated["yesterday"][0]["startsAt"].startswith("2025-11-01")
assert len(rotated["yesterday"]) == INTERVALS_PER_DAY # noqa: S101
assert rotated["yesterday"][0]["startsAt"].startswith("2025-11-01") # noqa: S101
assert len(rotated["today"]) == 96
assert rotated["today"][0]["startsAt"].startswith("2025-11-03")
assert len(rotated["today"]) == INTERVALS_PER_DAY # noqa: S101
assert rotated["today"][0]["startsAt"].startswith("2025-11-03") # noqa: S101
assert len(rotated["tomorrow"]) == 0
assert len(rotated["tomorrow"]) == 0 # noqa: S101
def test_midnight_turnover_no_rotation_needed() -> None:
"""Test that turnover skips rotation when data is already current."""
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator._perform_midnight_turnover = TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__(
coordinator._perform_midnight_turnover = ( # noqa: SLF001
TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__( # noqa: SLF001
coordinator, TibberPricesDataUpdateCoordinator
)
)
today_local = datetime(2025, 11, 2, 14, 30)
today_local = datetime(2025, 11, 2, 14, 30, tzinfo=UTC)
today_prices = generate_price_intervals(
datetime(2025, 11, 2, 0, 0),
num_intervals=96,
datetime(2025, 11, 2, 0, 0, tzinfo=UTC),
num_intervals=INTERVALS_PER_DAY,
)
tomorrow_prices = generate_price_intervals(
datetime(2025, 11, 3, 0, 0),
num_intervals=96,
datetime(2025, 11, 3, 0, 0, tzinfo=UTC),
num_intervals=INTERVALS_PER_DAY,
)
price_info = {
@ -99,27 +106,27 @@ def test_midnight_turnover_no_rotation_needed() -> None:
}
with patch("custom_components.tibber_prices.coordinator.dt_util") as mock_dt_util:
mock_dt_util.as_local.side_effect = lambda dt: dt if dt else datetime(2025, 11, 2)
mock_dt_util.as_local.side_effect = lambda dt: (dt if dt else datetime(2025, 11, 2, tzinfo=UTC))
mock_dt_util.now.return_value = today_local
mock_dt_util.parse_datetime.side_effect = lambda s: (datetime.fromisoformat(s) if s else None)
rotated = coordinator._perform_midnight_turnover(price_info)
rotated = coordinator._perform_midnight_turnover(price_info) # noqa: SLF001
assert rotated == price_info
assert rotated["today"][0]["startsAt"].startswith("2025-11-02")
assert rotated == price_info # noqa: S101
assert rotated["today"][0]["startsAt"].startswith("2025-11-02") # noqa: S101
def test_scenario_missed_midnight_recovery() -> None:
"""Scenario: Server was down at midnight, comes back online later."""
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator._perform_midnight_turnover = TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__(
coordinator._perform_midnight_turnover = ( # noqa: SLF001
TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__( # noqa: SLF001
coordinator, TibberPricesDataUpdateCoordinator
)
)
yesterday_prices = generate_price_intervals(datetime(2025, 11, 1, 0, 0))
tomorrow_prices = generate_price_intervals(datetime(2025, 11, 2, 0, 0))
yesterday_prices = generate_price_intervals(datetime(2025, 11, 1, 0, 0, tzinfo=UTC))
tomorrow_prices = generate_price_intervals(datetime(2025, 11, 2, 0, 0, tzinfo=UTC))
price_info = {
"yesterday": [],
@ -128,34 +135,34 @@ def test_scenario_missed_midnight_recovery() -> None:
}
with patch("custom_components.tibber_prices.coordinator.dt_util") as mock_dt_util:
current_local = datetime(2025, 11, 2, 14, 0)
current_local = datetime(2025, 11, 2, 14, 0, tzinfo=UTC)
mock_dt_util.as_local.side_effect = lambda dt: (dt if isinstance(dt, datetime) else current_local)
mock_dt_util.now.return_value = current_local
mock_dt_util.parse_datetime.side_effect = lambda s: (datetime.fromisoformat(s) if s else None)
rotated = coordinator._perform_midnight_turnover(price_info)
rotated = coordinator._perform_midnight_turnover(price_info) # noqa: SLF001
assert len(rotated["yesterday"]) == 96
assert rotated["yesterday"][0]["startsAt"].startswith("2025-11-01")
assert len(rotated["yesterday"]) == INTERVALS_PER_DAY # noqa: S101
assert rotated["yesterday"][0]["startsAt"].startswith("2025-11-01") # noqa: S101
assert len(rotated["today"]) == 96
assert rotated["today"][0]["startsAt"].startswith("2025-11-02")
assert len(rotated["today"]) == INTERVALS_PER_DAY # noqa: S101
assert rotated["today"][0]["startsAt"].startswith("2025-11-02") # noqa: S101
assert len(rotated["tomorrow"]) == 0
assert len(rotated["tomorrow"]) == 0 # noqa: S101
def test_scenario_normal_daily_refresh() -> None:
"""Scenario: Normal daily refresh at 5 AM (all data is current)."""
from custom_components.tibber_prices.coordinator import TibberPricesDataUpdateCoordinator
coordinator = Mock(spec=TibberPricesDataUpdateCoordinator)
coordinator._perform_midnight_turnover = TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__(
coordinator._perform_midnight_turnover = ( # noqa: SLF001
TibberPricesDataUpdateCoordinator._perform_midnight_turnover.__get__( # noqa: SLF001
coordinator, TibberPricesDataUpdateCoordinator
)
)
today_prices = generate_price_intervals(datetime(2025, 11, 2, 0, 0))
tomorrow_prices = generate_price_intervals(datetime(2025, 11, 3, 0, 0))
today_prices = generate_price_intervals(datetime(2025, 11, 2, 0, 0, tzinfo=UTC))
tomorrow_prices = generate_price_intervals(datetime(2025, 11, 3, 0, 0, tzinfo=UTC))
price_info = {
"yesterday": [],
@ -164,15 +171,15 @@ def test_scenario_normal_daily_refresh() -> None:
}
with patch("custom_components.tibber_prices.coordinator.dt_util") as mock_dt_util:
current_local = datetime(2025, 11, 2, 5, 0)
current_local = datetime(2025, 11, 2, 5, 0, tzinfo=UTC)
mock_dt_util.as_local.side_effect = lambda dt: (dt if isinstance(dt, datetime) else current_local)
mock_dt_util.now.return_value = current_local
mock_dt_util.parse_datetime.side_effect = lambda s: (datetime.fromisoformat(s) if s else None)
rotated = coordinator._perform_midnight_turnover(price_info)
rotated = coordinator._perform_midnight_turnover(price_info) # noqa: SLF001
assert len(rotated["today"]) == 96
assert rotated["today"][0]["startsAt"].startswith("2025-11-02")
assert len(rotated["tomorrow"]) == 96
assert rotated["tomorrow"][0]["startsAt"].startswith("2025-11-03")
assert len(rotated["today"]) == INTERVALS_PER_DAY # noqa: S101
assert rotated["today"][0]["startsAt"].startswith("2025-11-02") # noqa: S101
assert len(rotated["tomorrow"]) == INTERVALS_PER_DAY # noqa: S101
assert rotated["tomorrow"][0]["startsAt"].startswith("2025-11-03") # noqa: S101

View file

@ -10,6 +10,18 @@ from custom_components.tibber_prices.price_utils import (
)
from homeassistant.util import dt as dt_util
# Constants for testing
TOLERANCE_PERCENT = 0.001
TOLERANCE_DIFF = 0.01
PERCENT_50 = 50.0
PERCENT_1 = 1.0
INTERVALS_PER_DAY = 96
BASE_PRICE = 0.10
NEXT_PRICE = 0.15
TOMORROW_PRICE = 0.12
THRESHOLD_LOW = -10
THRESHOLD_HIGH = 10
def test_calculate_trailing_average_for_interval() -> None:
"""Test trailing average calculation for a specific interval."""
@ -18,12 +30,12 @@ def test_calculate_trailing_average_for_interval() -> None:
prices = []
# Create 96 quarter-hourly intervals (24 hours worth)
for i in range(96):
for i in range(INTERVALS_PER_DAY):
price_time = base_time - timedelta(hours=24) + timedelta(minutes=15 * i)
prices.append(
{
"startsAt": price_time.isoformat(),
"total": 0.1 + (i * 0.001), # Incrementing price
"total": BASE_PRICE + (i * 0.001), # Incrementing price
}
)
@ -31,32 +43,32 @@ def test_calculate_trailing_average_for_interval() -> None:
test_time = base_time
average = calculate_trailing_average_for_interval(test_time, prices)
assert average is not None
assert average is not None # noqa: S101
# Average of 96 prices from 0.1 to 0.195 (0.1 + 95*0.001)
expected_avg = (0.1 + 0.195) / 2 # ~0.1475
assert abs(average - expected_avg) < 0.001
expected_avg = (BASE_PRICE + 0.195) / 2 # ~0.1475
assert abs(average - expected_avg) < TOLERANCE_PERCENT # noqa: S101
def test_calculate_difference_percentage() -> None:
"""Test difference percentage calculation."""
current = 0.15
average = 0.10
current = NEXT_PRICE
average = BASE_PRICE
diff = calculate_difference_percentage(current, average)
assert diff is not None
assert abs(diff - 50.0) < 0.01 # 50% higher than average
assert diff is not None # noqa: S101
assert abs(diff - PERCENT_50) < TOLERANCE_DIFF # noqa: S101
# Test with same price
diff = calculate_difference_percentage(0.10, 0.10)
assert diff == 0.0
diff = calculate_difference_percentage(BASE_PRICE, BASE_PRICE)
assert diff == 0.0 # noqa: S101
# Test with None average
diff = calculate_difference_percentage(0.15, None)
assert diff is None
diff = calculate_difference_percentage(NEXT_PRICE, None)
assert diff is None # noqa: S101
# Test with zero average
diff = calculate_difference_percentage(0.15, 0.0)
assert diff is None
diff = calculate_difference_percentage(NEXT_PRICE, 0.0)
assert diff is None # noqa: S101
def test_enrich_price_info_with_differences() -> None:
@ -71,12 +83,12 @@ def test_enrich_price_info_with_differences() -> None:
}
# Fill yesterday with constant price
for i in range(96): # 96 intervals = 24 hours
for i in range(INTERVALS_PER_DAY): # 96 intervals = 24 hours
price_time = base_time - timedelta(days=1) + timedelta(minutes=15 * i)
price_info["yesterday"].append(
{
"startsAt": price_time.isoformat(),
"total": 0.10,
"total": BASE_PRICE,
}
)
@ -84,7 +96,7 @@ def test_enrich_price_info_with_differences() -> None:
price_info["today"].append(
{
"startsAt": base_time.isoformat(),
"total": 0.15,
"total": NEXT_PRICE,
}
)
@ -92,73 +104,70 @@ def test_enrich_price_info_with_differences() -> None:
price_info["tomorrow"].append(
{
"startsAt": (base_time + timedelta(days=1)).isoformat(),
"total": 0.12,
"total": TOMORROW_PRICE,
}
)
enriched = enrich_price_info_with_differences(price_info)
# Today's price should have a difference calculated
assert "difference" in enriched["today"][0]
assert enriched["today"][0]["difference"] is not None
assert "difference" in enriched["today"][0] # noqa: S101
assert enriched["today"][0]["difference"] is not None # noqa: S101
# 0.15 vs average of 0.10 = 50% higher
assert abs(enriched["today"][0]["difference"] - 50.0) < 1.0
assert abs(enriched["today"][0]["difference"] - PERCENT_50) < PERCENT_1 # noqa: S101
# Today's price should also have a rating_level (50% > 10% threshold = HIGH)
assert "rating_level" in enriched["today"][0]
assert enriched["today"][0]["rating_level"] == "HIGH"
assert "rating_level" in enriched["today"][0] # noqa: S101
assert enriched["today"][0]["rating_level"] == "HIGH" # noqa: S101
# Tomorrow's price should also have a difference
assert "difference" in enriched["tomorrow"][0]
assert enriched["tomorrow"][0]["difference"] is not None
assert "difference" in enriched["tomorrow"][0] # noqa: S101
assert enriched["tomorrow"][0]["difference"] is not None # noqa: S101
# Tomorrow's price should have a rating_level
# The average will be pulled from yesterday (0.10) and today (0.15)
# With tomorrow price at 0.12, it should be close to NORMAL or LOW
assert "rating_level" in enriched["tomorrow"][0]
assert "rating_level" in enriched["tomorrow"][0] # noqa: S101
rating_level_tomorrow = enriched["tomorrow"][0]["rating_level"]
assert rating_level_tomorrow in {"LOW", "NORMAL"}
assert rating_level_tomorrow in {"LOW", "NORMAL"} # noqa: S101
def test_calculate_rating_level() -> None:
"""Test rating level calculation based on difference percentage and thresholds."""
threshold_low = -10
threshold_high = 10
# Test LOW threshold
level = calculate_rating_level(-15.0, threshold_low, threshold_high)
assert level == "LOW"
level = calculate_rating_level(-15.0, THRESHOLD_LOW, THRESHOLD_HIGH)
assert level == "LOW" # noqa: S101
# Test exact low threshold
level = calculate_rating_level(-10.0, threshold_low, threshold_high)
assert level == "LOW"
level = calculate_rating_level(-10.0, THRESHOLD_LOW, THRESHOLD_HIGH)
assert level == "LOW" # noqa: S101
# Test HIGH threshold
level = calculate_rating_level(15.0, threshold_low, threshold_high)
assert level == "HIGH"
level = calculate_rating_level(15.0, THRESHOLD_LOW, THRESHOLD_HIGH)
assert level == "HIGH" # noqa: S101
# Test exact high threshold
level = calculate_rating_level(10.0, threshold_low, threshold_high)
assert level == "HIGH"
level = calculate_rating_level(10.0, THRESHOLD_LOW, THRESHOLD_HIGH)
assert level == "HIGH" # noqa: S101
# Test NORMAL (between thresholds)
level = calculate_rating_level(0.0, threshold_low, threshold_high)
assert level == "NORMAL"
level = calculate_rating_level(0.0, THRESHOLD_LOW, THRESHOLD_HIGH)
assert level == "NORMAL" # noqa: S101
level = calculate_rating_level(5.0, threshold_low, threshold_high)
assert level == "NORMAL"
level = calculate_rating_level(5.0, THRESHOLD_LOW, THRESHOLD_HIGH)
assert level == "NORMAL" # noqa: S101
level = calculate_rating_level(-5.0, threshold_low, threshold_high)
assert level == "NORMAL"
level = calculate_rating_level(-5.0, THRESHOLD_LOW, THRESHOLD_HIGH)
assert level == "NORMAL" # noqa: S101
# Test None difference
level = calculate_rating_level(None, threshold_low, threshold_high)
assert level is None
level = calculate_rating_level(None, THRESHOLD_LOW, THRESHOLD_HIGH)
assert level is None # noqa: S101
# Test edge case: difference in both ranges (both ranges simultaneously)
# This shouldn't normally happen, but if low > high, return NORMAL
level = calculate_rating_level(5.0, 10, -10) # inverted thresholds
assert level == "NORMAL"
assert level == "NORMAL" # noqa: S101
if __name__ == "__main__":

View file

@ -5,15 +5,30 @@ from datetime import datetime, timedelta
from custom_components.tibber_prices.price_utils import enrich_price_info_with_differences
from homeassistant.util import dt as dt_util
# Constants for integration testing
INTERVALS_PER_DAY = 96
VARIATION_THRESHOLD = 0.05
HOURS_PER_DAY = 24
INTERVALS_PER_HOUR = 4
PI_APPROX = 3.14159
INTERVAL_24 = 24
INTERVALS_68 = 68
INTERVALS_92 = 92
def generate_price_intervals(base_time: datetime, hours: int, base_price: float, variation: float = 0.05) -> list:
def generate_price_intervals(
base_time: datetime,
hours: int,
base_price: float,
variation: float = VARIATION_THRESHOLD,
) -> list:
"""Generate realistic price intervals."""
intervals = []
for i in range(hours * 4): # 4 intervals per hour (15-minute intervals)
for i in range(hours * INTERVALS_PER_HOUR): # 4 intervals per hour (15-minute intervals)
time = base_time + timedelta(minutes=15 * i)
# Add sinusoidal variation (peak at 18:00, low at 6:00)
hour_of_day = time.hour + time.minute / 60
variation_factor = 1 + variation * (((hour_of_day - 6) / 12) * 3.14159)
variation_factor = 1 + variation * (((hour_of_day - 6) / 12) * PI_APPROX)
price = base_price * (1 + 0.1 * (variation_factor - 1))
intervals.append(
@ -35,12 +50,23 @@ def test_realistic_day_pricing() -> None:
# Generate realistic data
price_info = {
"yesterday": generate_price_intervals(base_time - timedelta(days=1), hours=24, base_price=0.12, variation=0.08),
"yesterday": generate_price_intervals(
base_time - timedelta(days=1),
hours=HOURS_PER_DAY,
base_price=0.12,
variation=0.08,
),
"today": generate_price_intervals(
base_time.replace(hour=0, minute=0), hours=24, base_price=0.15, variation=0.10
base_time.replace(hour=0, minute=0),
hours=HOURS_PER_DAY,
base_price=0.15,
variation=0.10,
),
"tomorrow": generate_price_intervals(
base_time.replace(hour=0, minute=0) + timedelta(days=1), hours=24, base_price=0.13, variation=0.07
base_time.replace(hour=0, minute=0) + timedelta(days=1),
hours=HOURS_PER_DAY,
base_price=0.13,
variation=0.07,
),
}
@ -50,34 +76,47 @@ def test_realistic_day_pricing() -> None:
# Verify all today intervals have differences
today_intervals = enriched["today"]
for interval in today_intervals:
assert "difference" in interval, f"Missing difference in today interval {interval['startsAt']}"
assert "rating_level" in interval, f"Missing rating_level in today interval {interval['startsAt']}"
assert "difference" in interval, ( # noqa: S101
f"Missing difference in today interval {interval['startsAt']}"
)
assert "rating_level" in interval, ( # noqa: S101
f"Missing rating_level in today interval {interval['startsAt']}"
)
# Verify all tomorrow intervals have differences
tomorrow_intervals = enriched["tomorrow"]
for interval in tomorrow_intervals:
assert "difference" in interval, f"Missing difference in tomorrow interval {interval['startsAt']}"
assert "rating_level" in interval, f"Missing rating_level in tomorrow interval {interval['startsAt']}"
assert "difference" in interval, ( # noqa: S101
f"Missing difference in tomorrow interval {interval['startsAt']}"
)
assert "rating_level" in interval, ( # noqa: S101
f"Missing rating_level in tomorrow interval {interval['startsAt']}"
)
# Verify yesterday is unchanged (except for missing difference)
yesterday_intervals = enriched["yesterday"]
assert len(yesterday_intervals) == 96
assert len(yesterday_intervals) == INTERVALS_PER_DAY # noqa: S101
# Analyze statistics
today_diffs = [i.get("difference") for i in today_intervals if i.get("difference") is not None]
today_levels = [i.get("rating_level") for i in today_intervals if i.get("rating_level") is not None]
tomorrow_levels = [i.get("rating_level") for i in tomorrow_intervals if i.get("rating_level") is not None]
# Verify rating_level values are valid
valid_levels = {"LOW", "NORMAL", "HIGH"}
assert all(level in valid_levels for level in today_levels), "Invalid rating_level in today intervals"
assert all(level in valid_levels for level in tomorrow_levels), "Invalid rating_level in tomorrow intervals"
assert all(level in valid_levels for level in today_levels), ( # noqa: S101
"Invalid rating_level in today intervals"
)
assert all(level in valid_levels for level in tomorrow_levels), ( # noqa: S101
"Invalid rating_level in tomorrow intervals"
)
# With realistic pricing variation and default thresholds of -10/+10,
# we should have at least 2 different levels (most likely HIGH and NORMAL for today,
# and NORMAL for tomorrow due to cheaper prices)
unique_today_levels = set(today_levels)
assert len(unique_today_levels) >= 1, "Today should have at least one rating level"
assert len(unique_today_levels) >= 1, ( # noqa: S101
"Today should have at least one rating level"
)
def test_day_boundary_calculations() -> None:
@ -86,9 +125,17 @@ def test_day_boundary_calculations() -> None:
# Create data that spans the midnight boundary
price_info = {
"yesterday": generate_price_intervals(midnight - timedelta(days=1), hours=24, base_price=0.10),
"today": generate_price_intervals(midnight, hours=24, base_price=0.15),
"tomorrow": generate_price_intervals(midnight + timedelta(days=1), hours=24, base_price=0.12),
"yesterday": generate_price_intervals(
midnight - timedelta(days=1),
hours=HOURS_PER_DAY,
base_price=0.10,
),
"today": generate_price_intervals(midnight, hours=HOURS_PER_DAY, base_price=0.15),
"tomorrow": generate_price_intervals(
midnight + timedelta(days=1),
hours=HOURS_PER_DAY,
base_price=0.12,
),
}
enriched = enrich_price_info_with_differences(price_info)
@ -97,13 +144,15 @@ def test_day_boundary_calculations() -> None:
midnight_tomorrow = enriched["tomorrow"][0]
# This should include all 96 intervals from yesterday and all 96 from today
assert "difference" in midnight_tomorrow
assert "difference" in midnight_tomorrow # noqa: S101
diff = midnight_tomorrow.get("difference")
# Since tomorrow is cheaper (0.12) than both yesterday (0.10) and today (0.15)
# The difference could be negative (cheap) or positive (expensive) depending on the mix
diff = midnight_tomorrow.get("difference")
assert diff is not None, "Midnight boundary interval should have difference"
assert diff is not None, ( # noqa: S101
"Midnight boundary interval should have difference"
)
def test_early_morning_calculations() -> None:
@ -111,21 +160,31 @@ def test_early_morning_calculations() -> None:
base_time = dt_util.now().replace(hour=6, minute=0, second=0, microsecond=0)
price_info = {
"yesterday": generate_price_intervals(base_time - timedelta(days=1), hours=24, base_price=0.12),
"today": generate_price_intervals(base_time.replace(hour=0, minute=0), hours=24, base_price=0.15),
"yesterday": generate_price_intervals(
base_time - timedelta(days=1),
hours=HOURS_PER_DAY,
base_price=0.12,
),
"today": generate_price_intervals(
base_time.replace(hour=0, minute=0),
hours=HOURS_PER_DAY,
base_price=0.15,
),
"tomorrow": generate_price_intervals(
base_time.replace(hour=0, minute=0) + timedelta(days=1), hours=24, base_price=0.13
base_time.replace(hour=0, minute=0) + timedelta(days=1),
hours=HOURS_PER_DAY,
base_price=0.13,
),
}
enriched = enrich_price_info_with_differences(price_info)
# Get 6 AM interval (24th interval of the day)
six_am_interval = enriched["today"][24]
assert "difference" in six_am_interval
six_am_interval = enriched["today"][INTERVAL_24]
assert "difference" in six_am_interval # noqa: S101
# At 6 AM, we should include:
# - Yesterday from 6 AM to midnight (68 intervals)
# - Today from midnight to 6 AM (24 intervals)
# Total: 92 intervals (not quite 24 hours)
assert "difference" in six_am_interval
assert "difference" in six_am_interval # noqa: S101

View file

@ -1,42 +1,57 @@
"""Test that min/max/average include enriched attributes."""
from datetime import datetime
from datetime import UTC, datetime
import pytest
from custom_components.tibber_prices.services import _get_price_stat, _get_price_stats
# Constants for service enrichment tests
PRICE_MIN = 0.15
PRICE_MID = 0.25
PRICE_MAX = 0.35
PRICE_MINOR_MIN = 15
PRICE_MINOR_MID = 25
PRICE_MINOR_MAX = 35
DIFF_MIN = -10.5
DIFF_MID = 5.0
DIFF_MAX = 25.3
DIFF_MIN_LOW = -15.0
DIFF_MID_ZERO = 0.0
PRICE_LOW = 0.10
PRICE_HIGH = 0.20
def test_min_max_intervals_include_enriched_attributes():
def test_min_max_intervals_include_enriched_attributes() -> None:
"""Test that min/max intervals contain difference and rating_level."""
merged = [
{
"start_time": "2025-11-01T00:00:00+01:00",
"end_time": "2025-11-01T01:00:00+01:00",
"start_dt": datetime(2025, 11, 1, 0, 0),
"price": 0.15,
"price_minor": 15,
"difference": -10.5,
"start_dt": datetime(2025, 11, 1, 0, 0, tzinfo=UTC),
"price": PRICE_MIN,
"price_minor": PRICE_MINOR_MIN,
"difference": DIFF_MIN,
"rating_level": "LOW",
"level": "VERY_CHEAP",
},
{
"start_time": "2025-11-01T01:00:00+01:00",
"end_time": "2025-11-01T02:00:00+01:00",
"start_dt": datetime(2025, 11, 1, 1, 0),
"price": 0.25,
"price_minor": 25,
"difference": 5.0,
"start_dt": datetime(2025, 11, 1, 1, 0, tzinfo=UTC),
"price": PRICE_MID,
"price_minor": PRICE_MINOR_MID,
"difference": DIFF_MID,
"rating_level": "NORMAL",
"level": "NORMAL",
},
{
"start_time": "2025-11-01T02:00:00+01:00",
"end_time": "2025-11-01T03:00:00+01:00",
"start_dt": datetime(2025, 11, 1, 2, 0),
"price": 0.35,
"price_minor": 35,
"difference": 25.3,
"start_dt": datetime(2025, 11, 1, 2, 0, tzinfo=UTC),
"price": PRICE_MAX,
"price_minor": PRICE_MINOR_MAX,
"difference": DIFF_MAX,
"rating_level": "HIGH",
"level": "EXPENSIVE",
},
@ -45,36 +60,38 @@ def test_min_max_intervals_include_enriched_attributes():
stats = _get_price_stats(merged)
# Verify min interval has all attributes
assert stats.price_min == 0.15
assert stats.price_min_interval is not None
assert stats.price_min_interval["difference"] == -10.5
assert stats.price_min_interval["rating_level"] == "LOW"
assert stats.price_min_interval["level"] == "VERY_CHEAP"
assert stats.price_min == PRICE_MIN # noqa: S101
assert stats.price_min_interval is not None # noqa: S101
assert stats.price_min_interval["difference"] == DIFF_MIN # noqa: S101
assert stats.price_min_interval["rating_level"] == "LOW" # noqa: S101
assert stats.price_min_interval["level"] == "VERY_CHEAP" # noqa: S101
# Verify max interval has all attributes
assert stats.price_max == 0.35
assert stats.price_max_interval is not None
assert stats.price_max_interval["difference"] == 25.3
assert stats.price_max_interval["rating_level"] == "HIGH"
assert stats.price_max_interval["level"] == "EXPENSIVE"
assert stats.price_max == PRICE_MAX # noqa: S101
assert stats.price_max_interval is not None # noqa: S101
assert stats.price_max_interval["difference"] == DIFF_MAX # noqa: S101
assert stats.price_max_interval["rating_level"] == "HIGH" # noqa: S101
assert stats.price_max_interval["level"] == "EXPENSIVE" # noqa: S101
# Verify average price is calculated
assert stats.price_avg == pytest.approx((0.15 + 0.25 + 0.35) / 3, rel=1e-4)
assert stats.price_avg == pytest.approx( # noqa: S101
(PRICE_MIN + PRICE_MID + PRICE_MAX) / 3, rel=1e-4
)
def test_get_price_stat_returns_full_interval():
def test_get_price_stat_returns_full_interval() -> None:
"""Test that _get_price_stat returns the complete interval dict."""
merged = [
{
"start_time": "2025-11-01T00:00:00+01:00",
"price": 0.10,
"difference": -15.0,
"price": PRICE_LOW,
"difference": DIFF_MIN_LOW,
"rating_level": "LOW",
},
{
"start_time": "2025-11-01T01:00:00+01:00",
"price": 0.20,
"difference": 0.0,
"price": PRICE_HIGH,
"difference": DIFF_MID_ZERO,
"rating_level": "NORMAL",
},
]
@ -83,24 +100,24 @@ def test_get_price_stat_returns_full_interval():
max_price, max_interval = _get_price_stat(merged, "max")
# Min should be first interval
assert min_price == 0.10
assert min_interval is not None
assert min_interval["difference"] == -15.0
assert min_interval["rating_level"] == "LOW"
assert min_price == PRICE_LOW # noqa: S101
assert min_interval is not None # noqa: S101
assert min_interval["difference"] == DIFF_MIN_LOW # noqa: S101
assert min_interval["rating_level"] == "LOW" # noqa: S101
# Max should be second interval
assert max_price == 0.20
assert max_interval is not None
assert max_interval["difference"] == 0.0
assert max_interval["rating_level"] == "NORMAL"
assert max_price == PRICE_HIGH # noqa: S101
assert max_interval is not None # noqa: S101
assert max_interval["difference"] == DIFF_MID_ZERO # noqa: S101
assert max_interval["rating_level"] == "NORMAL" # noqa: S101
def test_empty_merged_returns_none_intervals():
def test_empty_merged_returns_none_intervals() -> None:
"""Test that empty merged list returns None for intervals."""
stats = _get_price_stats([])
assert stats.price_min == 0
assert stats.price_min_interval is None
assert stats.price_max == 0
assert stats.price_max_interval is None
assert stats.price_avg == 0
assert stats.price_min == 0 # noqa: S101
assert stats.price_min_interval is None # noqa: S101
assert stats.price_max == 0 # noqa: S101
assert stats.price_max_interval is None # noqa: S101
assert stats.price_avg == 0 # noqa: S101