refactor(coordinator): replace DataFetcher with PriceDataManager

Rename and refactor data_fetching.py → price_data_manager.py to reflect
actual responsibilities:
- User data: Fetches directly via API, validates, caches
- Price data: Delegates to IntervalPool (single source of truth)

Key changes:
- Add should_fetch_tomorrow_data() for intelligent API call decisions
- Add include_tomorrow parameter to prevent API spam before 13:00
- Remove cached_price_data property (Pool is source of truth)
- Update tests to use new class name

Impact: Clearer separation of concerns, reduced API calls through
intelligent tomorrow data fetching logic.
This commit is contained in:
Julian Pawlowski 2025-12-23 14:13:43 +00:00
parent 78df8a4b17
commit cfc7cf6abc
2 changed files with 274 additions and 171 deletions

View file

@ -1,10 +1,32 @@
"""Data fetching logic for the coordinator."""
"""
Price data management for the coordinator.
This module manages all price-related data for the Tibber Prices integration:
**User Data** (fetched directly via API):
- Home metadata (name, address, timezone)
- Account info (subscription status)
- Currency settings
- Refreshed daily (24h interval)
**Price Data** (fetched via IntervalPool):
- Quarter-hourly price intervals
- Yesterday/today/tomorrow coverage
- The IntervalPool handles actual API fetching, deduplication, and caching
- This manager coordinates the data flow and user data refresh
Data flow:
Tibber API IntervalPool PriceDataManager Coordinator Sensors
(actual fetching) (orchestration + user data)
Note: Price data is NOT cached in this module - IntervalPool is the single
source of truth. This module only caches user_data for daily refresh cycle.
"""
from __future__ import annotations
import asyncio
import logging
import secrets
from datetime import timedelta
from typing import TYPE_CHECKING, Any
@ -13,26 +35,41 @@ from custom_components.tibber_prices.api import (
TibberPricesApiClientCommunicationError,
TibberPricesApiClientError,
)
from homeassistant.core import callback
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import UpdateFailed
from . import cache, helpers
from .constants import TOMORROW_DATA_CHECK_HOUR, TOMORROW_DATA_RANDOM_DELAY_MAX
if TYPE_CHECKING:
from collections.abc import Callable
from datetime import datetime
from custom_components.tibber_prices.api import TibberPricesApiClient
from custom_components.tibber_prices.interval_pool import TibberPricesIntervalPool
from .time_service import TibberPricesTimeService
_LOGGER = logging.getLogger(__name__)
# Hour when Tibber publishes tomorrow's prices (around 13:00 local time)
# Before this hour, requesting tomorrow data will always fail → wasted API call
TOMORROW_DATA_AVAILABLE_HOUR = 13
class TibberPricesDataFetcher:
"""Handles data fetching, caching, and main/subentry coordination."""
class TibberPricesPriceDataManager:
"""
Manages price and user data for the coordinator.
Responsibilities:
- User data: Fetches directly via API, validates, caches with persistence
- Price data: Coordinates with IntervalPool (which does actual API fetching)
- Cache management: Loads/stores both data types to HA persistent storage
- Update decisions: Determines when fresh data is needed
Note: Despite the name, this class does NOT do the actual price fetching.
The IntervalPool handles API calls, deduplication, and interval management.
This class orchestrates WHEN to fetch and processes the results.
"""
def __init__( # noqa: PLR0913
self,
@ -42,19 +79,31 @@ class TibberPricesDataFetcher:
user_update_interval: timedelta,
time: TibberPricesTimeService,
home_id: str,
interval_pool: TibberPricesIntervalPool,
) -> None:
"""Initialize the data fetcher."""
"""
Initialize the price data manager.
Args:
api: API client for direct requests (user data only).
store: Home Assistant storage for persistence.
log_prefix: Prefix for log messages (e.g., "[Home Name]").
user_update_interval: How often to refresh user data (default: 1 day).
time: TimeService for time operations.
home_id: Home ID this manager is responsible for.
interval_pool: IntervalPool for price data (handles actual fetching).
"""
self.api = api
self._store = store
self._log_prefix = log_prefix
self._user_update_interval = user_update_interval
self.time: TibberPricesTimeService = time
self.home_id = home_id
self._interval_pool = interval_pool
# Cached data
self._cached_price_data: dict[str, Any] | None = None
# Cached data (user data only - price data is in IntervalPool)
self._cached_user_data: dict[str, Any] | None = None
self._last_price_update: datetime | None = None
self._last_user_update: datetime | None = None
def _log(self, level: str, message: str, *args: object, **kwargs: object) -> None:
@ -63,31 +112,67 @@ class TibberPricesDataFetcher:
getattr(_LOGGER, level)(prefixed_message, *args, **kwargs)
async def load_cache(self) -> None:
"""Load cached data from storage."""
"""Load cached user data from storage (price data is in IntervalPool)."""
cache_data = await cache.load_cache(self._store, self._log_prefix, time=self.time)
self._cached_price_data = cache_data.price_data
self._cached_user_data = cache_data.user_data
self._last_price_update = cache_data.last_price_update
self._last_user_update = cache_data.last_user_update
# Parse timestamps if we loaded price data from cache
if self._cached_price_data:
self._cached_price_data = helpers.parse_all_timestamps(self._cached_price_data, time=self.time)
def should_fetch_tomorrow_data(
self,
current_price_info: list[dict[str, Any]] | None,
) -> bool:
"""
Determine if tomorrow's data should be requested from the API.
# Validate cache: check if price data is from a previous day
if not cache.is_cache_valid(cache_data, self._log_prefix, time=self.time):
self._log("info", "Cached price data is from a previous day, clearing cache to fetch fresh data")
self._cached_price_data = None
self._last_price_update = None
await self.store_cache()
This is the key intelligence that prevents API spam:
- Tibber publishes tomorrow's prices around 13:00 each day
- Before 13:00, requesting tomorrow data will always fail wasted API call
- If we already have tomorrow data, no need to request it again
The decision logic:
1. Before 13:00 local time Don't fetch (data not available yet)
2. After 13:00 AND tomorrow data already present Don't fetch (already have it)
3. After 13:00 AND tomorrow data missing Fetch (data should be available)
Args:
current_price_info: List of price intervals from current coordinator data.
Used to check if tomorrow data already exists.
Returns:
True if tomorrow data should be requested, False otherwise.
"""
now = self.time.now()
now_local = self.time.as_local(now)
current_hour = now_local.hour
# Before TOMORROW_DATA_AVAILABLE_HOUR - tomorrow data not available yet
if current_hour < TOMORROW_DATA_AVAILABLE_HOUR:
self._log("debug", "Before %d:00 - not requesting tomorrow data", TOMORROW_DATA_AVAILABLE_HOUR)
return False
# After TOMORROW_DATA_AVAILABLE_HOUR - check if we already have tomorrow data
if current_price_info:
has_tomorrow = self.has_tomorrow_data(current_price_info)
if has_tomorrow:
self._log(
"debug", "After %d:00 but already have tomorrow data - not requesting", TOMORROW_DATA_AVAILABLE_HOUR
)
return False
self._log("debug", "After %d:00 and tomorrow data missing - will request", TOMORROW_DATA_AVAILABLE_HOUR)
return True
# No current data - request tomorrow data if after TOMORROW_DATA_AVAILABLE_HOUR
self._log(
"debug", "After %d:00 with no current data - will request tomorrow data", TOMORROW_DATA_AVAILABLE_HOUR
)
return True
async def store_cache(self, last_midnight_check: datetime | None = None) -> None:
"""Store cache data."""
"""Store cache data (user metadata only, price data is in IntervalPool)."""
cache_data = cache.TibberPricesCacheData(
price_data=self._cached_price_data,
user_data=self._cached_user_data,
last_price_update=self._last_price_update,
last_user_update=self._last_user_update,
last_midnight_check=last_midnight_check,
)
@ -196,57 +281,23 @@ class TibberPricesDataFetcher:
return True # User data was updated
return False # No update needed
@callback
def should_update_price_data(self, current_time: datetime) -> bool | str:
async def fetch_home_data(
self,
home_id: str,
current_time: datetime,
*,
include_tomorrow: bool = True,
) -> dict[str, Any]:
"""
Check if price data should be updated from the API.
Fetch data for a single home via pool.
API calls only happen when truly needed:
1. No cached data exists
2. Cache is invalid (from previous day - detected by _is_cache_valid)
3. After 13:00 local time and tomorrow's data is missing or invalid
Cache validity is ensured by:
- _is_cache_valid() checks date mismatch on load
- Midnight turnover clears cache (Timer #2)
- Tomorrow data validation after 13:00
No periodic "safety" updates - trust the cache validation!
Returns:
bool or str: True for immediate update, "tomorrow_check" for tomorrow
data check (needs random delay), False for no update
Args:
home_id: Home ID to fetch data for.
current_time: Current time for timestamp in result.
include_tomorrow: If True, request tomorrow's data too. If False,
only request up to end of today.
"""
if self._cached_price_data is None:
self._log("debug", "API update needed: No cached price data")
return True
if self._last_price_update is None:
self._log("debug", "API update needed: No last price update timestamp")
return True
# Check if after 13:00 and tomorrow data is missing or invalid
now_local = self.time.as_local(current_time)
if now_local.hour >= TOMORROW_DATA_CHECK_HOUR and self._cached_price_data and self.needs_tomorrow_data():
self._log(
"info",
"API update needed: After %s:00 and tomorrow's data missing/invalid",
TOMORROW_DATA_CHECK_HOUR,
)
# Return special marker to indicate this is a tomorrow data check
# Caller should add random delay to spread load
return "tomorrow_check"
# No update needed - cache is valid and complete
self._log("debug", "No API update needed: Cache is valid and complete")
return False
def needs_tomorrow_data(self) -> bool:
"""Check if tomorrow data is missing or invalid."""
return helpers.needs_tomorrow_data(self._cached_price_data)
async def fetch_home_data(self, home_id: str, current_time: datetime) -> dict[str, Any]:
"""Fetch data for a single home."""
if not home_id:
self._log("warning", "No home ID provided - cannot fetch price data")
return {
@ -279,24 +330,17 @@ class TibberPricesDataFetcher:
self._log("error", msg)
raise TibberPricesApiClientError(msg) from ex
# Get price data for this home
# Pass user_data for timezone-aware cursor calculation
# At this point, _cached_user_data is guaranteed to be not None (checked above)
if not self._cached_user_data:
msg = "User data unexpectedly None after fetch attempt"
raise TibberPricesApiClientError(msg)
self._log("debug", "Fetching price data for home %s", home_id)
home_data = await self.api.async_get_price_info(
home_id=home_id,
user_data=self._cached_user_data,
)
# Retrieve price data via IntervalPool (single source of truth)
price_info = await self._fetch_via_pool(home_id, include_tomorrow=include_tomorrow)
# Extract currency for this home from user_data
currency = self._get_currency_for_home(home_id)
price_info = home_data.get("price_info", [])
self._log("debug", "Successfully fetched data for home %s (%d intervals)", home_id, len(price_info))
return {
@ -306,6 +350,50 @@ class TibberPricesDataFetcher:
"currency": currency,
}
async def _fetch_via_pool(
self,
home_id: str,
*,
include_tomorrow: bool = True,
) -> list[dict[str, Any]]:
"""
Retrieve price data via IntervalPool.
The IntervalPool is the single source of truth for price data:
- Handles actual API calls to Tibber
- Manages deduplication and caching
- Provides intervals from day-before-yesterday to end-of-today/tomorrow
This method delegates to the Pool's get_sensor_data() which returns
all relevant intervals for sensor display.
Args:
home_id: Home ID (currently unused, Pool knows its home).
include_tomorrow: If True, request tomorrow's data too. If False,
only request up to end of today. This prevents
API spam before 13:00 when Tibber doesn't have
tomorrow data yet.
Returns:
List of price interval dicts.
"""
# user_data is guaranteed by fetch_home_data(), but needed for type narrowing
if self._cached_user_data is None:
return []
self._log(
"debug",
"Retrieving price data for home %s via interval pool (include_tomorrow=%s)",
home_id,
include_tomorrow,
)
return await self._interval_pool.get_sensor_data(
api_client=self.api,
user_data=self._cached_user_data,
include_tomorrow=include_tomorrow,
)
def _get_currency_for_home(self, home_id: str) -> str:
"""
Get currency for a specific home from cached user_data.
@ -373,8 +461,31 @@ class TibberPricesDataFetcher:
current_time: datetime,
home_id: str,
transform_fn: Callable[[dict[str, Any]], dict[str, Any]],
*,
current_price_info: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""Handle update for main entry - fetch data for this home."""
"""
Handle update for main entry - fetch data for this home.
The IntervalPool is the single source of truth for price data:
- It handles API fetching, deduplication, and caching internally
- We decide WHEN to fetch tomorrow data (after 13:00, if not already present)
- This prevents API spam before 13:00 when Tibber doesn't have tomorrow data
This method:
1. Updates user data if needed (daily)
2. Determines if tomorrow data should be requested
3. Fetches price data via IntervalPool
4. Transforms result for coordinator
Args:
current_time: Current time for update decisions.
home_id: Home ID to fetch data for.
transform_fn: Function to transform raw data for coordinator.
current_price_info: Current price intervals (from coordinator.data["priceInfo"]).
Used to check if tomorrow data already exists.
"""
# Update user data if needed (daily check)
user_data_updated = await self.update_user_data_if_needed(current_time)
@ -384,84 +495,50 @@ class TibberPricesDataFetcher:
if not home_exists:
self._log("warning", "Home ID %s not found in Tibber account", home_id)
# Return a special marker in the result that coordinator can check
# We still need to return valid data to avoid coordinator errors
result = transform_fn(self._cached_price_data or {})
result = transform_fn({})
result["_home_not_found"] = True # Special marker for coordinator
return result
# Check if we need to update price data
should_update = self.should_update_price_data(current_time)
# Determine if we should request tomorrow data
include_tomorrow = self.should_fetch_tomorrow_data(current_price_info)
if should_update:
# If this is a tomorrow data check, add random delay to spread API load
if should_update == "tomorrow_check":
# Use secrets for better randomness distribution
delay = secrets.randbelow(TOMORROW_DATA_RANDOM_DELAY_MAX + 1)
self._log(
"debug",
"Tomorrow data check - adding random delay of %d seconds to spread load",
delay,
)
await asyncio.sleep(delay)
# Fetch price data via IntervalPool
self._log(
"debug",
"Fetching price data for home %s via interval pool (include_tomorrow=%s)",
home_id,
include_tomorrow,
)
raw_data = await self.fetch_home_data(home_id, current_time, include_tomorrow=include_tomorrow)
self._log("debug", "Fetching fresh price data from API")
raw_data = await self.fetch_home_data(home_id, current_time)
# Parse timestamps immediately after API fetch
raw_data = helpers.parse_all_timestamps(raw_data, time=self.time)
# Cache the data (now with datetime objects)
self._cached_price_data = raw_data
self._last_price_update = current_time
# Parse timestamps immediately after fetch
raw_data = helpers.parse_all_timestamps(raw_data, time=self.time)
# Store user data cache (price data persisted by IntervalPool)
if user_data_updated:
await self.store_cache()
# Transform for main entry
return transform_fn(raw_data)
# Use cached data if available
if self._cached_price_data is not None:
# If user data was updated, we need to return transformed data to trigger entity updates
# This ensures diagnostic sensors (home_type, grid_company, etc.) get refreshed
if user_data_updated:
self._log("debug", "User data updated - returning transformed data to update diagnostic sensors")
else:
self._log("debug", "Using cached price data (no API call needed)")
return transform_fn(self._cached_price_data)
# Fallback: no cache and no update needed (shouldn't happen)
self._log("warning", "No cached data available and update not triggered - returning empty data")
return {
"timestamp": current_time,
"home_id": home_id,
"priceInfo": [],
"currency": "",
}
# Transform for main entry
return transform_fn(raw_data)
async def handle_api_error(
self,
error: Exception,
transform_fn: Callable[[dict[str, Any]], dict[str, Any]],
) -> dict[str, Any]:
"""Handle API errors with fallback to cached data."""
) -> None:
"""
Handle API errors - re-raise appropriate exceptions.
Note: With IntervalPool as source of truth, there's no local price cache
to fall back to. The Pool has its own persistence, so on next update
it will use its cached intervals if API is unavailable.
"""
if isinstance(error, TibberPricesApiClientAuthenticationError):
msg = "Invalid access token"
raise ConfigEntryAuthFailed(msg) from error
# Use cached data as fallback if available
if self._cached_price_data is not None:
self._log("warning", "API error, using cached data: %s", error)
return transform_fn(self._cached_price_data)
msg = f"Error communicating with API: {error}"
raise UpdateFailed(msg) from error
@property
def cached_price_data(self) -> dict[str, Any] | None:
"""Get cached price data."""
return self._cached_price_data
@cached_price_data.setter
def cached_price_data(self, value: dict[str, Any] | None) -> None:
"""Set cached price data."""
self._cached_price_data = value
@property
def cached_user_data(self) -> dict[str, Any] | None:
"""Get cached user data."""

View file

@ -26,8 +26,8 @@ import pytest
from custom_components.tibber_prices.api.exceptions import TibberPricesApiClientError
from custom_components.tibber_prices.api.helpers import flatten_price_info
from custom_components.tibber_prices.coordinator.data_fetching import (
TibberPricesDataFetcher,
from custom_components.tibber_prices.coordinator.price_data_manager import (
TibberPricesPriceDataManager,
)
@ -52,16 +52,23 @@ def mock_store() -> Mock:
return Mock()
@pytest.fixture
def mock_interval_pool() -> Mock:
"""Create a mock interval pool."""
return Mock()
@pytest.mark.unit
def test_validate_user_data_complete(mock_api_client, mock_time_service, mock_store) -> None: # noqa: ANN001
def test_validate_user_data_complete(mock_api_client, mock_time_service, mock_store, mock_interval_pool) -> None: # noqa: ANN001
"""Test that complete user data passes validation."""
fetcher = TibberPricesDataFetcher(
price_data_manager = TibberPricesPriceDataManager(
api=mock_api_client,
store=mock_store,
log_prefix="[Test]",
user_update_interval=timedelta(days=1),
time=mock_time_service,
home_id="home-123",
interval_pool=mock_interval_pool,
)
user_data = {
@ -82,19 +89,22 @@ def test_validate_user_data_complete(mock_api_client, mock_time_service, mock_st
}
}
assert fetcher._validate_user_data(user_data, "home-123") is True # noqa: SLF001 # noqa: SLF001
assert price_data_manager._validate_user_data(user_data, "home-123") is True # noqa: SLF001 # noqa: SLF001
@pytest.mark.unit
def test_validate_user_data_none_subscription(mock_api_client, mock_time_service, mock_store) -> None: # noqa: ANN001
def test_validate_user_data_none_subscription(
mock_api_client: Mock, mock_time_service: Mock, mock_store: Mock, mock_interval_pool: Mock
) -> None:
"""Test that user data without subscription (but with timezone) passes validation."""
fetcher = TibberPricesDataFetcher(
price_data_manager = TibberPricesPriceDataManager(
api=mock_api_client,
store=mock_store,
log_prefix="[Test]",
user_update_interval=timedelta(days=1),
time=mock_time_service,
home_id="home-123",
interval_pool=mock_interval_pool,
)
user_data = {
@ -110,19 +120,22 @@ def test_validate_user_data_none_subscription(mock_api_client, mock_time_service
}
# Should pass validation - timezone is present, subscription being None is valid
assert fetcher._validate_user_data(user_data, "home-123") is True # noqa: SLF001 # noqa: SLF001
assert price_data_manager._validate_user_data(user_data, "home-123") is True # noqa: SLF001 # noqa: SLF001
@pytest.mark.unit
def test_validate_user_data_missing_timezone(mock_api_client, mock_time_service, mock_store) -> None: # noqa: ANN001
def test_validate_user_data_missing_timezone(
mock_api_client: Mock, mock_time_service: Mock, mock_store: Mock, mock_interval_pool: Mock
) -> None:
"""Test that user data without timezone fails validation."""
fetcher = TibberPricesDataFetcher(
price_data_manager = TibberPricesPriceDataManager(
api=mock_api_client,
store=mock_store,
log_prefix="[Test]",
user_update_interval=timedelta(days=1),
time=mock_time_service,
home_id="home-123",
interval_pool=mock_interval_pool,
)
user_data = {
@ -143,19 +156,22 @@ def test_validate_user_data_missing_timezone(mock_api_client, mock_time_service,
}
}
assert fetcher._validate_user_data(user_data, "home-123") is False # noqa: SLF001 # noqa: SLF001
assert price_data_manager._validate_user_data(user_data, "home-123") is False # noqa: SLF001 # noqa: SLF001
@pytest.mark.unit
def test_validate_user_data_subscription_without_currency(mock_api_client, mock_time_service, mock_store) -> None: # noqa: ANN001
def test_validate_user_data_subscription_without_currency(
mock_api_client: Mock, mock_time_service: Mock, mock_store: Mock, mock_interval_pool: Mock
) -> None:
"""Test that user data with subscription but no currency fails validation."""
fetcher = TibberPricesDataFetcher(
price_data_manager = TibberPricesPriceDataManager(
api=mock_api_client,
store=mock_store,
log_prefix="[Test]",
user_update_interval=timedelta(days=1),
time=mock_time_service,
home_id="home-123",
interval_pool=mock_interval_pool,
)
user_data = {
@ -174,19 +190,20 @@ def test_validate_user_data_subscription_without_currency(mock_api_client, mock_
}
}
assert fetcher._validate_user_data(user_data, "home-123") is False # noqa: SLF001 # noqa: SLF001
assert price_data_manager._validate_user_data(user_data, "home-123") is False # noqa: SLF001 # noqa: SLF001
@pytest.mark.unit
def test_validate_user_data_home_not_found(mock_api_client, mock_time_service, mock_store) -> None: # noqa: ANN001
def test_validate_user_data_home_not_found(mock_api_client, mock_time_service, mock_store, mock_interval_pool) -> None: # noqa: ANN001
"""Test that user data without the requested home fails validation."""
fetcher = TibberPricesDataFetcher(
price_data_manager = TibberPricesPriceDataManager(
api=mock_api_client,
store=mock_store,
log_prefix="[Test]",
user_update_interval=timedelta(days=1),
time=mock_time_service,
home_id="home-123",
interval_pool=mock_interval_pool,
)
user_data = {
@ -200,39 +217,45 @@ def test_validate_user_data_home_not_found(mock_api_client, mock_time_service, m
}
}
assert fetcher._validate_user_data(user_data, "home-123") is False # noqa: SLF001 # noqa: SLF001
assert price_data_manager._validate_user_data(user_data, "home-123") is False # noqa: SLF001 # noqa: SLF001
@pytest.mark.unit
def test_get_currency_raises_on_no_cached_data(mock_api_client, mock_time_service, mock_store) -> None: # noqa: ANN001
def test_get_currency_raises_on_no_cached_data(
mock_api_client: Mock, mock_time_service: Mock, mock_store: Mock, mock_interval_pool: Mock
) -> None:
"""Test that _get_currency_for_home raises exception when no data cached."""
fetcher = TibberPricesDataFetcher(
price_data_manager = TibberPricesPriceDataManager(
api=mock_api_client,
store=mock_store,
log_prefix="[Test]",
user_update_interval=timedelta(days=1),
time=mock_time_service,
home_id="home-123",
interval_pool=mock_interval_pool,
)
# No cached data
with pytest.raises(TibberPricesApiClientError, match="No user data cached"):
fetcher._get_currency_for_home("home-123") # noqa: SLF001 # noqa: SLF001
price_data_manager._get_currency_for_home("home-123") # noqa: SLF001 # noqa: SLF001
@pytest.mark.unit
def test_get_currency_raises_on_no_subscription(mock_api_client, mock_time_service, mock_store) -> None: # noqa: ANN001
def test_get_currency_raises_on_no_subscription(
mock_api_client: Mock, mock_time_service: Mock, mock_store: Mock, mock_interval_pool: Mock
) -> None:
"""Test that _get_currency_for_home raises exception when home has no subscription."""
fetcher = TibberPricesDataFetcher(
price_data_manager = TibberPricesPriceDataManager(
api=mock_api_client,
store=mock_store,
log_prefix="[Test]",
user_update_interval=timedelta(days=1),
time=mock_time_service,
home_id="home-123",
interval_pool=mock_interval_pool,
)
fetcher._cached_user_data = { # noqa: SLF001 # noqa: SLF001
price_data_manager._cached_user_data = { # noqa: SLF001 # noqa: SLF001
"viewer": {
"homes": [
{
@ -244,22 +267,25 @@ def test_get_currency_raises_on_no_subscription(mock_api_client, mock_time_servi
}
with pytest.raises(TibberPricesApiClientError, match="has no active subscription"):
fetcher._get_currency_for_home("home-123") # noqa: SLF001 # noqa: SLF001
price_data_manager._get_currency_for_home("home-123") # noqa: SLF001 # noqa: SLF001
@pytest.mark.unit
def test_get_currency_extracts_valid_currency(mock_api_client, mock_time_service, mock_store) -> None: # noqa: ANN001
def test_get_currency_extracts_valid_currency(
mock_api_client: Mock, mock_time_service: Mock, mock_store: Mock, mock_interval_pool: Mock
) -> None:
"""Test that _get_currency_for_home successfully extracts currency."""
fetcher = TibberPricesDataFetcher(
price_data_manager = TibberPricesPriceDataManager(
api=mock_api_client,
store=mock_store,
log_prefix="[Test]",
user_update_interval=timedelta(days=1),
time=mock_time_service,
home_id="home-123",
interval_pool=mock_interval_pool,
)
fetcher._cached_user_data = { # noqa: SLF001 # noqa: SLF001
price_data_manager._cached_user_data = { # noqa: SLF001 # noqa: SLF001
"viewer": {
"homes": [
{
@ -276,7 +302,7 @@ def test_get_currency_extracts_valid_currency(mock_api_client, mock_time_service
}
}
assert fetcher._get_currency_for_home("home-123") == "NOK" # noqa: SLF001 # noqa: SLF001
assert price_data_manager._get_currency_for_home("home-123") == "NOK" # noqa: SLF001 # noqa: SLF001
@pytest.mark.unit