Compare commits

...

5 commits

Author SHA1 Message Date
Julian Pawlowski
bb8f5aa8cc chore(testing): add optional Pyright checks for tests
Some checks failed
Validate / HACS validation (push) Has been cancelled
Lint / Ruff (push) Has been cancelled
Validate / Hassfest validation (push) Has been cancelled
Add a dedicated type-check-tests helper, wire it into check-all behind --with-test-types, and align the affected tests with current typing and helper contracts.

Impact: No direct user-facing change.

User-Impact: none
2026-04-25 22:46:43 +00:00
Julian Pawlowski
bbcfdd4443 fix(periods): stabilize best and peak period outputs
Recompute merged relaxed periods from raw intervals, harden numeric period option normalization, update day-volatility handling for zero or negative averages, and expose day context on period binary sensors.

Add focused regressions for overlap merges, cache invalidation, day statistics, and visible binary sensor attributes.

Impact: Best and peak period entities stay consistent on negative-price days, refresh correctly when same-day prices change, and expose the documented day context attributes.
2026-04-25 22:46:38 +00:00
Julian Pawlowski
10c83d6720 fix(periods): keep negative best-price windows strictly local
Always treat prices at or below zero as valid best-price intervals, rescue short
negative cores with directly adjacent cheap shoulders before min-length filtering,
and block geometric or shape-based widening for periods that already contain a
negative-price core.

Impact: Negative best-price periods no longer expand into positive edge intervals on days with extreme negative prices.
2026-04-25 20:00:04 +00:00
Julian Pawlowski
c8f40e0b8a Merge branch 'main' of https://github.com/jpawlowski/hass.tibber_prices 2026-04-25 18:36:05 +00:00
Julian Pawlowski
870b716681 feat(settings): add local settings for script permissions
Introduce a new settings file to define permissions for various script executions.

Impact: Enables controlled execution of linting, type-checking, and testing scripts.
2026-04-25 18:36:02 +00:00
26 changed files with 1738 additions and 174 deletions

View file

@ -0,0 +1,11 @@
{
"permissions": {
"allow": [
"Bash(./scripts/lint-check)",
"Bash(./scripts/type-check)",
"Bash(./scripts/test tests/services/test_plan_charging.py tests/services/test_energy_calculator.py tests/services/test_power_scheduler.py)",
"Bash(./scripts/test)",
"Bash(./scripts/check)"
]
}
}

View file

@ -283,6 +283,22 @@ def add_price_attributes(attributes: dict, current_period: dict, factor: int) ->
attributes["volatility"] = current_period["volatility"] # Volatility is not a price, keep as-is
def add_day_statistics_attributes(attributes: dict, current_period: dict) -> None:
"""Add per-day context attributes for the current/next period.
Day price range fields are already stored in minor currency units (ct/ore)
by the period summary builder and therefore must not be converted again here.
"""
if "day_volatility_%" in current_period:
attributes["day_volatility_%"] = current_period["day_volatility_%"]
if "day_price_min" in current_period:
attributes["day_price_min"] = current_period["day_price_min"]
if "day_price_max" in current_period:
attributes["day_price_max"] = current_period["day_price_max"]
if "day_price_span" in current_period:
attributes["day_price_span"] = current_period["day_price_span"]
def add_comparison_attributes(attributes: dict, current_period: dict, factor: int) -> None:
"""
Add price comparison attributes (priority 4).
@ -473,12 +489,13 @@ def build_final_attributes_simple(
2. Core decision attributes (level, rating_level, rating_difference_%)
3. Price statistics (price_mean, price_median, price_min, price_max, price_spread, volatility)
4. Price differences (period_price_diff_from_daily_min, period_price_diff_from_daily_min_%)
5. Detail information (period_interval_count, period_position, period_count_total, period_count_remaining)
6. Relaxation information (relaxation_active, relaxation_level, relaxation_threshold_original_%,
5. Day context (day_volatility_%, day_price_min, day_price_max, day_price_span)
6. Detail information (period_interval_count, period_position, period_count_total, period_count_remaining)
7. Relaxation information (relaxation_active, relaxation_level, relaxation_threshold_original_%,
relaxation_threshold_applied_%) - only if current period was relaxed
7. Calculation summary (min_periods_configured, flat_days_detected,
8. Calculation summary (min_periods_configured, flat_days_detected,
relaxation_incomplete) - diagnostic info about the overall calculation
8. Meta information (periods list)
9. Meta information (periods list)
Args:
current_period: The current or next period (already complete from coordinator)
@ -514,20 +531,23 @@ def build_final_attributes_simple(
# 4. Price differences (converted to display units)
add_comparison_attributes(attributes, current_period, factor)
# 5. Detail information
# 5. Day context attributes (already in minor units)
add_day_statistics_attributes(attributes, current_period)
# 6. Detail information
add_detail_attributes(attributes, current_period)
# 5.5 Per-day period counts (how many cheap/peak periods per day)
# 6.5 Per-day period counts (how many cheap/peak periods per day)
add_period_count_attributes(attributes, period_summaries, time)
# 6. Relaxation information (only if current period was relaxed)
# 7. Relaxation information (only if current period was relaxed)
add_relaxation_attributes(attributes, current_period)
# 7. Calculation summary (diagnostic: min_periods_configured, flat_days_detected, etc.)
# 8. Calculation summary (diagnostic: min_periods_configured, flat_days_detected, etc.)
if period_metadata:
add_calculation_summary_attributes(attributes, period_metadata)
# 8. Meta information (periods array - prices converted to display units)
# 9. Meta information (periods array - prices converted to display units)
attributes["periods"] = _convert_periods_to_display_units(period_summaries, factor)
return attributes

View file

@ -101,13 +101,19 @@ class PeriodSummary(TypedDict, total=False):
period_price_diff_from_daily_min: float # Difference from daily min
period_price_diff_from_daily_min_pct: float # Difference from daily min (%)
# Detail information (priority 5)
# Day context (priority 5)
day_volatility_pct: float | None # Volatility of the period's day (%), None for zero-average days
day_price_min: float # Daily minimum price in minor currency (ct/ore)
day_price_max: float # Daily maximum price in minor currency (ct/ore)
day_price_span: float # Daily price span in minor currency (ct/ore)
# Detail information (priority 6)
period_interval_count: int # Number of intervals in period
period_position: int # Period position (1-based)
period_count_total: int # Total number of periods
period_count_remaining: int # Remaining periods after this one
# Relaxation information (priority 6 - only if period was relaxed)
# Relaxation information (priority 7 - only if period was relaxed)
relaxation_active: bool # Whether this period was found via relaxation
relaxation_level: int # Relaxation level used (1-based)
relaxation_threshold_original_pct: float # Original flex threshold (%)
@ -125,9 +131,10 @@ class PeriodAttributes(BaseAttributes, total=False):
2. Core decision attributes (level, rating_level, rating_difference_%)
3. Price statistics (price_mean, price_median, price_min, price_max, price_spread, volatility)
4. Price comparison (period_price_diff_from_daily_min, period_price_diff_from_daily_min_%)
5. Detail information (period_interval_count, period_position, period_count_total, period_count_remaining)
6. Relaxation information (only if period was relaxed)
7. Meta information (periods list)
5. Day context (day_volatility_%, day_price_min, day_price_max, day_price_span)
6. Detail information (period_interval_count, period_position, period_count_total, period_count_remaining)
7. Relaxation information (only if period was relaxed)
8. Meta information (periods list)
"""
# Time information (priority 1) - start/end refer to current/next period
@ -152,19 +159,25 @@ class PeriodAttributes(BaseAttributes, total=False):
period_price_diff_from_daily_min: float # Difference from daily min
period_price_diff_from_daily_min_pct: float # Difference from daily min (%)
# Detail information (priority 5)
# Day context (priority 5)
day_volatility_pct: float | None # Volatility of the period's day (%), None for zero-average days
day_price_min: float # Daily minimum price in minor currency (ct/ore)
day_price_max: float # Daily maximum price in minor currency (ct/ore)
day_price_span: float # Daily price span in minor currency (ct/ore)
# Detail information (priority 6)
period_interval_count: int # Number of intervals in current/next period
period_position: int # Period position (1-based)
period_count_total: int # Total number of periods found
period_count_remaining: int # Remaining periods after current/next one
# Relaxation information (priority 6 - only if period was relaxed)
# Relaxation information (priority 7 - only if period was relaxed)
relaxation_active: bool # Whether current/next period was found via relaxation
relaxation_level: int # Relaxation level used (1-based)
relaxation_threshold_original_pct: float # Original flex threshold (%)
relaxation_threshold_applied_pct: float # Applied flex threshold after relaxation (%)
# Meta information (priority 7)
# Meta information (priority 8)
periods: list[PeriodSummary] # All periods found (sorted by start time)

View file

@ -871,7 +871,7 @@ class TibberPricesDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
return self._data_transformer.get_threshold_percentages()
def _calculate_periods_for_price_info(
self, price_info: dict[str, Any], day_patterns: dict[str, Any] | None = None
self, price_info: list[dict[str, Any]], day_patterns: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Calculate periods (best price and peak price) for the given price info."""
return self._period_calculator.calculate_periods_for_price_info(price_info, day_patterns)

View file

@ -28,7 +28,7 @@ class TibberPricesDataTransformer:
self,
config_entry: ConfigEntry,
log_prefix: str,
calculate_periods_fn: Callable[[dict[str, Any], dict[str, Any] | None], dict[str, Any]],
calculate_periods_fn: Callable[[list[dict[str, Any]], dict[str, Any] | None], dict[str, Any]],
time: TibberPricesTimeService,
) -> None:
"""Initialize the data transformer."""

View file

@ -16,6 +16,7 @@ from .period_building import (
add_interval_ends,
build_periods,
calculate_reference_prices,
extend_negative_core_periods_for_min_length,
extend_periods_across_midnight,
filter_periods_by_end_date,
filter_periods_by_min_length,
@ -201,6 +202,17 @@ def calculate_periods(
len(raw_periods),
)
# Step 3.75: Rescue short negative best-price cores before min-length filtering.
# This keeps <= 0 prices as the hard core and only adds directly adjacent cheap
# shoulders when needed to reach the configured minimum length.
if not reverse_sort:
raw_periods = extend_negative_core_periods_for_min_length(
raw_periods,
all_prices_sorted,
min_period_length,
time=time,
)
# Step 4: Filter by minimum length
raw_periods = filter_periods_by_min_length(raw_periods, min_period_length, time=time)
_LOGGER.debug(

View file

@ -132,6 +132,17 @@ def check_interval_criteria(
Tuple of (in_flex, meets_min_distance)
"""
# ============================================================
# FAST PATH: Negative/zero prices always qualify as best price
# ============================================================
# When price ≤ 0 the consumer is paid or gets free electricity.
# This is unconditionally the cheapest possible outcome regardless
# of daily average, flex setting, or level filter.
# Bypasses both flex AND min_distance: a negative price is always
# maximally "far below average" in the economically meaningful sense.
if not criteria.reverse_sort and price <= 0:
return True, True
# Normalize inputs to absolute values for consistent calculation
flex_abs = abs(criteria.flex)
min_distance_abs = abs(criteria.min_distance_from_avg)
@ -143,22 +154,19 @@ def check_interval_criteria(
# - Peak price (reverse_sort=True): daily MAXIMUM
# - Best price (reverse_sort=False): daily MINIMUM
#
# Standard formula (positive daily minimum):
# Flex base = max(price_span, abs(ref_price)):
# - On V-shape days (tiny minimum, large span): span wins → meaningful flex band
# - On flat days (large minimum, small span): ref_price wins → same as before
#
# WHY NOT plain ref_price * flex: When daily_min is a single low outlier
# (e.g., min=1 ct, avg=19 ct), the flex band collapses to near-zero
# (1 ct * 15% = 0.15 ct) and no period of sufficient length can be found.
#
# WHY NOT plain span * flex: On flat days (e.g., min=30 ct, span=3 ct),
# this makes the band much narrower than before, breaking existing behaviour.
#
# Examples with flex=15%:
# - V-shape: min=1 ct, avg=19 ct → span=18 ct → flex_base=18 → threshold=1+2.7=3.7 ct (spans fixed)
# - Flat: min=30 ct, avg=33 ct → span=3 ct → flex_base=30 → threshold=30+4.5=34.5 ct (unchanged)
# - Normal: min=10 ct, avg=20 ct → span=10 ct → flex_base=10 → threshold=10+1.5=11.5 ct (unchanged)
# Examples with flex=15% (positive minimum):
# - V-shape: min=1 ct, avg=19 ct → span=18 ct → flex_base=18 → threshold=1+2.7=3.7 ct
# - Flat: min=30 ct, avg=33 ct → span=3 ct → flex_base=30 → threshold=30+4.5=34.5 ct
# - Normal: min=10 ct, avg=20 ct → span=10 ct → flex_base=10 → threshold=10+1.5=11.5 ct
# Positive shoulders around a short negative core are handled later in the
# raw-period pipeline, where adjacency can be evaluated locally. Keeping the
# interval filter day-agnostic avoids creating a global halo across the whole day.
price_span = abs(criteria.avg_price - criteria.ref_price)
flex_base = max(price_span, abs(criteria.ref_price))

View file

@ -6,7 +6,7 @@ from datetime import date, datetime, timedelta
import logging
from typing import TYPE_CHECKING, Any
from custom_components.tibber_prices.const import PRICE_LEVEL_MAPPING
from custom_components.tibber_prices.const import PRICE_LEVEL_CHEAP, PRICE_LEVEL_MAPPING, PRICE_LEVEL_VERY_CHEAP
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
@ -19,6 +19,7 @@ _LOGGER_DETAILS = logging.getLogger(__name__ + ".details")
# Module-local log indentation (each module starts at level 0)
INDENT_L0 = "" # Entry point / main function
NEGATIVE_CORE_NO_SHOULDER_INTERVALS = 8 # 2 hours at 15-min resolution
def split_intervals_by_day(
@ -66,6 +67,182 @@ def _trim_trailing_gaps(period: list[dict]) -> list[dict]:
return period
def _build_period_interval(price_data: dict, *, time: TibberPricesTimeService) -> dict | None:
"""Build the internal interval representation used by raw periods."""
starts_at = time.get_interval_time(price_data)
if starts_at is None:
return None
price_original = float(price_data.get("_original_price", price_data["total"]))
return {
"interval_hour": starts_at.hour,
"interval_minute": starts_at.minute,
"interval_time": f"{starts_at.hour:02d}:{starts_at.minute:02d}",
"price": price_original,
"interval_start": starts_at,
"smoothing_was_impactful": False,
"is_level_gap": False,
"geometric_bonus_applied": False,
}
def _get_longest_negative_core_length(period: list[dict]) -> int:
"""Return the longest contiguous run of intervals with price <= 0."""
longest = 0
current = 0
for interval in period:
if float(interval.get("price", 0.0)) <= 0:
current += 1
longest = max(longest, current)
else:
current = 0
return longest
def _collect_contiguous_best_price_side(
interval_index: dict[datetime, dict],
start_cursor: datetime,
step: timedelta,
*,
max_intervals: int,
time: TibberPricesTimeService,
) -> list[dict]:
"""Collect directly adjacent favourable intervals on one side of a negative core."""
for target_level in (PRICE_LEVEL_VERY_CHEAP, PRICE_LEVEL_CHEAP):
additions: list[dict] = []
cursor = start_cursor
for _ in range(max_intervals):
price_data = interval_index.get(cursor)
if price_data is None or price_data.get("level") != target_level:
break
period_interval = _build_period_interval(price_data, time=time)
if period_interval is None:
break
additions.append(period_interval)
cursor += step
if additions:
if step < timedelta(0):
additions.reverse()
return additions
return []
def _select_nearest_extensions(
left_candidates: list[dict],
right_candidates: list[dict],
*,
max_total_additions: int,
) -> tuple[list[dict], list[dict]]:
"""Select the nearest left/right additions until the target length is reached."""
left_nearest = list(reversed(left_candidates))
right_nearest = right_candidates.copy()
selected_left_nearest: list[dict] = []
selected_right: list[dict] = []
prefer_left = bool(left_nearest)
while max_total_additions > 0 and (left_nearest or right_nearest):
if prefer_left and left_nearest:
selected_left_nearest.append(left_nearest.pop(0))
max_total_additions -= 1
elif not prefer_left and right_nearest:
selected_right.append(right_nearest.pop(0))
max_total_additions -= 1
elif left_nearest:
selected_left_nearest.append(left_nearest.pop(0))
max_total_additions -= 1
elif right_nearest:
selected_right.append(right_nearest.pop(0))
max_total_additions -= 1
prefer_left = not prefer_left
return list(reversed(selected_left_nearest)), selected_right
def extend_negative_core_periods_for_min_length(
periods: list[list[dict]],
all_prices: list[dict],
min_period_length: int,
*,
time: TibberPricesTimeService,
) -> list[list[dict]]:
"""Locally extend short negative best-price cores into directly adjacent cheap shoulders.
This rescue step is intentionally narrow:
- only periods that already contain a negative/zero core are considered
- only periods shorter than the configured minimum length are extended
- only directly adjacent VERY_CHEAP/CHEAP intervals may be added
- multi-hour negative blocks stay untouched to preserve a strict negative-only period
"""
if not periods:
return periods
min_intervals = time.minutes_to_intervals(min_period_length)
if min_intervals <= 0:
return periods
interval_index: dict[datetime, dict] = {}
for price_data in all_prices:
starts_at = time.get_interval_time(price_data)
if starts_at is not None:
interval_index[starts_at] = price_data
interval_duration = time.get_interval_duration()
extended_periods: list[list[dict]] = []
for period in periods:
negative_core_length = _get_longest_negative_core_length(period)
if (
negative_core_length == 0
or negative_core_length >= NEGATIVE_CORE_NO_SHOULDER_INTERVALS
or len(period) >= min_intervals
):
extended_periods.append(period)
continue
period_start = period[0].get("interval_start")
period_end = period[-1].get("interval_start")
if period_start is None or period_end is None:
extended_periods.append(period)
continue
needed_intervals = min_intervals - len(period)
left_candidates = _collect_contiguous_best_price_side(
interval_index,
period_start - interval_duration,
-interval_duration,
max_intervals=needed_intervals,
time=time,
)
right_candidates = _collect_contiguous_best_price_side(
interval_index,
period_end + interval_duration,
interval_duration,
max_intervals=needed_intervals,
time=time,
)
selected_left, selected_right = _select_nearest_extensions(
left_candidates,
right_candidates,
max_total_additions=needed_intervals,
)
if selected_left or selected_right:
extended_periods.append([*selected_left, *period, *selected_right])
else:
extended_periods.append(period)
return extended_periods
def build_periods(
all_prices: list[dict],
price_context: dict[str, Any],
@ -144,7 +321,6 @@ def build_periods(
)
for day in ref_prices
}
for price_data in all_prices:
starts_at = time.get_interval_time(price_data)
if starts_at is None:
@ -173,9 +349,16 @@ def build_periods(
# Check flex and minimum distance criteria (using smoothed price and interval's own day reference)
criteria = criteria_by_day[ref_date]
# Compute geometric flex bonus if pattern-aware expansion is enabled
# Compute geometric flex bonus if pattern-aware expansion is enabled.
# Best-price days with a negative daily minimum are handled by the dedicated
# negative-core logic; applying a day-wide geometric valley bonus there would
# reintroduce broad positive shoulders around a negative core.
geo_bonus = 0.0
if geometric_extra_flex > 0 and day_patterns_by_date is not None:
if (
geometric_extra_flex > 0
and day_patterns_by_date is not None
and not (not reverse_sort and criteria.ref_price < 0)
):
day_pattern_for_date = day_patterns_by_date.get(ref_date)
geo_bonus = compute_geometric_flex_bonus(
starts_at,

View file

@ -3,11 +3,13 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
from .types import TibberPricesPeriodConfig
_LOGGER = logging.getLogger(__name__)
_LOGGER_DETAILS = logging.getLogger(__name__ + ".details")
@ -82,9 +84,9 @@ def recalculate_period_metadata(periods: list[dict], *, time: TibberPricesTimeSe
period["period_count_remaining"] = total_periods - position
def merge_adjacent_periods(period1: dict, period2: dict) -> dict:
def _merge_adjacent_periods_from_summaries(period1: dict, period2: dict) -> dict:
"""
Merge two adjacent or overlapping periods into one.
Merge two adjacent or overlapping periods from summary data only.
The newer period's relaxation attributes override the older period's.
Takes the earliest start time and latest end time.
@ -111,14 +113,6 @@ def merge_adjacent_periods(period1: dict, period2: dict) -> dict:
- period_interval_level_gap_count
- period_interval_smoothed_count
Args:
period1: First period (older baseline or relaxed period)
period2: Second period (newer relaxed period with higher flex)
Returns:
Merged period dict with combined time span, recomputed price extremes,
and the newer period's relaxation attributes.
"""
# Take earliest start and latest end
merged_start = min(period1["start"], period2["start"])
@ -205,6 +199,236 @@ def merge_adjacent_periods(period1: dict, period2: dict) -> dict:
return merged
def _build_raw_merge_context(
all_prices: list[dict],
config: TibberPricesPeriodConfig,
*,
time: TibberPricesTimeService,
) -> dict[str, Any] | None:
"""Build reusable context for raw-interval merge recomputation."""
from .period_building import calculate_reference_prices, split_intervals_by_day # noqa: PLC0415
from .types import TibberPricesThresholdConfig # noqa: PLC0415
sorted_prices = sorted(
all_prices,
key=lambda price_data: time.get_interval_time(price_data) or time.now(),
)
interval_lookup: dict[Any, dict] = {}
for price_data in sorted_prices:
if (interval_start := time.get_interval_time(price_data)) is not None:
interval_lookup[interval_start] = price_data
if not interval_lookup:
return None
intervals_by_day, avg_price_by_day = split_intervals_by_day(sorted_prices, time=time)
ref_prices = calculate_reference_prices(intervals_by_day, reverse_sort=config.reverse_sort)
thresholds = TibberPricesThresholdConfig(
threshold_low=config.threshold_low,
threshold_high=config.threshold_high,
threshold_volatility_moderate=config.threshold_volatility_moderate,
threshold_volatility_high=config.threshold_volatility_high,
threshold_volatility_very_high=config.threshold_volatility_very_high,
reverse_sort=config.reverse_sort,
)
return {
"interval_duration": time.get_interval_duration(),
"interval_lookup": interval_lookup,
"price_context": {
"ref_prices": ref_prices,
"avg_prices": avg_price_by_day,
"intervals_by_day": intervals_by_day,
},
"thresholds": thresholds,
}
def _collect_period_price_data(
merged_start: Any,
merged_end: Any,
merge_context: dict[str, Any],
) -> list[dict] | None:
"""Collect the contiguous raw intervals for a merged period span."""
interval_lookup = merge_context["interval_lookup"]
interval_duration = merge_context["interval_duration"]
period_price_data: list[dict] = []
cursor = merged_start
while cursor < merged_end:
if (price_data := interval_lookup.get(cursor)) is None:
return None
period_price_data.append(price_data)
cursor += interval_duration
return period_price_data
def _rebuild_merged_period_from_raw(
period1: dict,
period2: dict,
merge_context: dict[str, Any],
) -> dict | None:
"""Rebuild merged period statistics from the raw interval union."""
from custom_components.tibber_prices.utils.price import ( # noqa: PLC0415
aggregate_period_levels,
aggregate_period_ratings,
calculate_coefficient_of_variation,
calculate_volatility_level,
)
from .period_statistics import ( # noqa: PLC0415
build_period_summary_dict,
calculate_aggregated_rating_difference,
calculate_period_price_diff,
calculate_period_price_statistics,
)
from .types import TibberPricesPeriodData, TibberPricesPeriodStatistics # noqa: PLC0415
merged_start = min(period1["start"], period2["start"])
merged_end = max(period1["end"], period2["end"])
period_price_data = _collect_period_price_data(merged_start, merged_end, merge_context)
if not period_price_data:
return None
thresholds = merge_context["thresholds"]
price_context = merge_context["price_context"]
aggregated_level = aggregate_period_levels(period_price_data)
aggregated_rating = None
if thresholds.threshold_low is not None and thresholds.threshold_high is not None:
aggregated_rating, _ = aggregate_period_ratings(
period_price_data,
thresholds.threshold_low,
thresholds.threshold_high,
)
price_stats = calculate_period_price_statistics(period_price_data)
period_price_diff, period_price_diff_pct = calculate_period_price_diff(
price_stats["price_mean"],
merged_start,
price_context,
)
prices_for_volatility = [float(price_data["total"]) for price_data in period_price_data if "total" in price_data]
period_cv = calculate_coefficient_of_variation(prices_for_volatility)
volatility = calculate_volatility_level(
prices_for_volatility,
threshold_moderate=thresholds.threshold_volatility_moderate,
threshold_high=thresholds.threshold_volatility_high,
threshold_very_high=thresholds.threshold_volatility_very_high,
).lower()
rating_difference_pct = calculate_aggregated_rating_difference(period_price_data)
merged = build_period_summary_dict(
TibberPricesPeriodData(
start_time=merged_start,
end_time=merged_end,
period_length=len(period_price_data),
period_idx=1,
total_periods=1,
),
TibberPricesPeriodStatistics(
aggregated_level=aggregated_level,
aggregated_rating=aggregated_rating,
rating_difference_pct=rating_difference_pct,
price_mean=price_stats["price_mean"],
price_median=price_stats["price_median"],
price_min=price_stats["price_min"],
price_max=price_stats["price_max"],
price_spread=price_stats["price_spread"],
volatility=volatility,
coefficient_of_variation=round(period_cv, 1) if period_cv is not None else None,
period_price_diff=period_price_diff,
period_price_diff_pct=period_price_diff_pct,
),
reverse_sort=thresholds.reverse_sort,
price_context=price_context,
)
if period1.get("relaxation_active") or period2.get("relaxation_active"):
merged["relaxation_active"] = True
for attr in (
"relaxation_level",
"relaxation_threshold_original_%",
"relaxation_threshold_applied_%",
"duration_fallback_active",
"duration_fallback_min_length",
):
if attr in period2:
merged[attr] = period2[attr]
elif attr in period1:
merged[attr] = period1[attr]
for attr in (
"period_interval_level_gap_count",
"period_interval_smoothed_count",
):
total = 0
has_value = False
for period in (period1, period2):
if (value := period.get(attr)) is not None:
total += int(value)
has_value = True
if has_value:
merged[attr] = total
merged["merged_from"] = {
"period1_start": period1["start"].isoformat(),
"period1_end": period1["end"].isoformat(),
"period2_start": period2["start"].isoformat(),
"period2_end": period2["end"].isoformat(),
}
_LOGGER_DETAILS.debug(
"%sMerged periods from raw intervals: %s-%s + %s-%s%s-%s (intervals: %d, mean: %s)",
INDENT_L2,
period1["start"].strftime("%H:%M"),
period1["end"].strftime("%H:%M"),
period2["start"].strftime("%H:%M"),
period2["end"].strftime("%H:%M"),
merged_start.strftime("%H:%M"),
merged_end.strftime("%H:%M"),
merged.get("period_interval_count"),
merged.get("price_mean"),
)
return merged
def merge_adjacent_periods(
period1: dict,
period2: dict,
*,
merge_context: dict[str, Any] | None = None,
) -> dict:
"""
Merge two adjacent or overlapping periods into one.
When raw interval data is available, rebuild the merged summary from the
underlying interval union so medians, CV, ratings, and interval counts stay
exact after overlap resolution. Falls back to the previous summary-based
approximation if the raw slice cannot be recovered.
"""
if merge_context is not None and (recomputed := _rebuild_merged_period_from_raw(period1, period2, merge_context)):
return recomputed
if merge_context is not None:
_LOGGER.debug(
"Falling back to summary-based merge for %s-%s + %s-%s",
period1["start"].strftime("%H:%M"),
period1["end"].strftime("%H:%M"),
period2["start"].strftime("%H:%M"),
period2["end"].strftime("%H:%M"),
)
return _merge_adjacent_periods_from_summaries(period1, period2)
def _check_merge_quality_gate(periods_to_merge: list[tuple[int, dict]], relaxed: dict) -> bool:
"""
Check if merging would create a period that's too heterogeneous.
@ -336,6 +560,10 @@ def _find_adjacent_or_overlapping(relaxed: dict, existing_periods: list[dict]) -
def resolve_period_overlaps(
existing_periods: list[dict],
new_relaxed_periods: list[dict],
*,
all_prices: list[dict] | None = None,
config: TibberPricesPeriodConfig | None = None,
time: TibberPricesTimeService | None = None,
) -> tuple[list[dict], int]:
"""
Resolve overlaps between existing periods and newly found relaxed periods.
@ -355,6 +583,9 @@ def resolve_period_overlaps(
Args:
existing_periods: All previously found periods (baseline + earlier relaxation phases)
new_relaxed_periods: Periods found in current relaxation phase (will be merged if adjacent)
all_prices: Optional raw interval data for exact merged-summary recomputation
config: Optional period config used to rebuild merged summaries from raw data
time: Optional time service for interval alignment during raw recomputation
Returns:
Tuple of (merged_periods, new_periods_count):
@ -378,6 +609,10 @@ def resolve_period_overlaps(
merged = existing_periods.copy()
periods_added = 0
merge_context = None
if all_prices is not None and config is not None and time is not None:
merge_context = _build_raw_merge_context(all_prices, config, time=time)
for relaxed in new_relaxed_periods:
relaxed_start = relaxed["start"]
@ -428,7 +663,7 @@ def resolve_period_overlaps(
# Remove old periods (in reverse order to maintain indices)
for idx, existing in reversed(periods_to_merge):
merged_period = merge_adjacent_periods(existing, merged_period)
merged_period = merge_adjacent_periods(existing, merged_period, merge_context=merge_context)
merged.pop(idx)
# Add the merged result

View file

@ -206,8 +206,10 @@ def build_period_summary_dict(
day_span = day_max - day_min
day_avg = avg_prices.get(period_start_date, sum(day_prices) / len(day_prices))
# Calculate volatility percentage (span / avg * 100)
day_volatility_pct = round((day_span / day_avg * 100), 1) if day_avg > 0 else 0.0
# Calculate volatility percentage relative to the day's absolute average.
# Negative-average days remain meaningful, while true zero-average days
# cannot produce a truthful percentage and therefore return None.
day_volatility_pct = round((day_span / abs(day_avg) * 100), 1) if day_avg != 0 else None
# Convert to minor units (ct/øre) for consistency with other price attributes
summary["day_volatility_%"] = day_volatility_pct

View file

@ -283,6 +283,7 @@ def _try_min_duration_fallback(
*,
config: TibberPricesPeriodConfig,
existing_periods: list[dict],
all_prices: list[dict],
prices_by_day: dict[date, list[dict]],
time: TibberPricesTimeService,
max_relaxation_attempts: int = 0,
@ -438,6 +439,9 @@ def _try_min_duration_fallback(
merged_periods, _new_count = resolve_period_overlaps(
existing_periods,
fallback_periods,
all_prices=all_prices,
config=config,
time=time,
)
recalculate_period_metadata(merged_periods, time=time)
@ -836,6 +840,7 @@ def calculate_periods_with_relaxation(
fallback_result, fallback_metadata = _try_min_duration_fallback(
config=config,
existing_periods=all_periods,
all_prices=all_prices,
prices_by_day=prices_by_day,
time=time,
max_relaxation_attempts=max_relaxation_attempts,
@ -1018,6 +1023,9 @@ def relax_all_prices(
combined, standalone_count = resolve_period_overlaps(
existing_periods=existing_periods,
new_relaxed_periods=new_periods,
all_prices=all_prices,
config=config,
time=time,
)
# Count periods per day with QUALITY GATE check

View file

@ -47,6 +47,7 @@ if TYPE_CHECKING:
from .types import TibberPricesThresholdConfig
_INTERVAL_DURATION = timedelta(minutes=15)
NEGATIVE_CORE_DISABLE_EXTENSION_INTERVALS = 1
def extend_periods_for_shape(
@ -269,6 +270,12 @@ def _extend_period_edges(
# Collect original intervals early needed for the majority gate below.
original_intervals = _collect_original_intervals(start, end, interval_index)
# Negative-price best-price periods use dedicated core/shoulder handling earlier
# in the pipeline. Do not widen them again here just because adjacent intervals
# are labelled VERY_CHEAP/CHEAP.
if not reverse_sort and _contains_negative_core(original_intervals):
return period
# ── walk LEFT (earlier than period start) ─────────────────────────────────
left_cursor = start - _INTERVAL_DURATION
left_additions = _walk_contiguous(interval_index, left_cursor, backward_step, primary_level, max_intervals)
@ -401,3 +408,18 @@ def _collect_original_intervals(
result.append(iv)
cursor += _INTERVAL_DURATION
return result
def _contains_negative_core(intervals: list[dict[str, Any]]) -> bool:
"""Return True when the period contains at least one negative/zero-price interval."""
negative_run = 0
for interval in intervals:
if float(interval.get("total", 0.0)) <= 0:
negative_run += 1
if negative_run >= NEGATIVE_CORE_DISABLE_EXTENSION_INTERVALS:
return True
else:
negative_run = 0
return False

View file

@ -74,6 +74,54 @@ class TibberPricesPeriodCalculator:
section = self.config_entry.options.get(config_section, {})
return section.get(config_key, default)
def _normalize_float_option(
self,
value: Any,
default: float,
*,
option_name: str,
absolute: bool = False,
divisor: float = 1.0,
) -> float:
"""Normalize numeric config values and fall back cleanly on invalid input."""
try:
normalized = float(value)
except TypeError, ValueError:
self._log("warning", "Invalid numeric option %s=%r, using default %s", option_name, value, default)
normalized = float(default)
if absolute:
normalized = abs(normalized)
return normalized / divisor
def _normalize_int_option(
self,
value: Any,
default: int,
*,
option_name: str,
minimum: int | None = None,
) -> int:
"""Normalize integer config values and fall back cleanly on invalid input."""
try:
normalized = int(value)
except TypeError, ValueError:
self._log("warning", "Invalid integer option %s=%r, using default %s", option_name, value, default)
return default
if minimum is not None and normalized < minimum:
self._log(
"warning",
"Out-of-range integer option %s=%r, using default %s",
option_name,
value,
default,
)
return default
return normalized
def _log(self, level: str, message: str, *args: object, **kwargs: object) -> None:
"""Log with calculator-specific prefix."""
prefixed_message = f"{self._log_prefix} {message}"
@ -88,12 +136,12 @@ class TibberPricesPeriodCalculator:
self._last_periods_hash = None
self._log("debug", "Period config cache and calculation cache invalidated")
def _compute_periods_hash(self, price_info: dict[str, Any]) -> str:
def _compute_periods_hash(self, price_info: list[dict[str, Any]]) -> str:
"""
Compute hash of price data and config for period calculation caching.
Only includes data that affects period calculation:
- All interval timestamps and enriched rating levels (yesterday/today/tomorrow)
- Today/tomorrow interval content (timestamps, totals, levels, ratings, differences)
- Period calculation config (flex, min_distance, min_period_length)
- Level filter overrides
@ -101,20 +149,42 @@ class TibberPricesPeriodCalculator:
Hash string for cache key comparison.
"""
# Get today and tomorrow intervals for hash calculation
# CRITICAL: Only today+tomorrow needed in hash because:
# 1. Mitternacht: "today" startsAt changes → cache invalidates
# 2. Tomorrow arrival: "tomorrow" startsAt changes from None → cache invalidates
# 3. Yesterday/day-before-yesterday are static (rating_levels don't change retroactively)
# 4. Using first startsAt as representative (changes → entire day changed)
# Get today and tomorrow intervals for hash calculation.
# Hash full interval signatures instead of only the first startsAt so we also
# invalidate when prices or enriched levels change within the same calendar day.
coordinator_data = {"priceInfo": price_info}
today_intervals = get_intervals_for_day_offsets(coordinator_data, [0])
tomorrow_intervals = get_intervals_for_day_offsets(coordinator_data, [1])
# Use first startsAt of each day as representative for entire day's data
# If day is empty, use None (detects data availability changes)
today_start = today_intervals[0].get("startsAt") if today_intervals else None
tomorrow_start = tomorrow_intervals[0].get("startsAt") if tomorrow_intervals else None
def _build_interval_signature(intervals: list[dict[str, Any]]) -> tuple[tuple[Any, Any, Any, Any, Any], ...]:
signature: list[tuple[Any, Any, Any, Any, Any]] = []
for interval in intervals:
starts_at = interval.get("startsAt")
starts_at_key = (
starts_at.isoformat() if starts_at is not None and hasattr(starts_at, "isoformat") else starts_at
)
total = interval.get("total")
total_key = round(float(total), 6) if total is not None else None
difference = interval.get("difference")
difference_key = round(float(difference), 6) if difference is not None else None
signature.append(
(
starts_at_key,
total_key,
interval.get("level"),
interval.get("rating_level"),
difference_key,
)
)
return tuple(signature)
today_signature = _build_interval_signature(today_intervals)
tomorrow_signature = _build_interval_signature(tomorrow_intervals)
# Get period configs (both best and peak)
best_config = self.get_period_config(reverse_sort=False)
@ -128,8 +198,8 @@ class TibberPricesPeriodCalculator:
# Compute hash from all relevant data
hash_data = (
today_start, # Representative for today's data (changes at midnight)
tomorrow_start, # Representative for tomorrow's data (changes when data arrives)
today_signature,
tomorrow_signature,
tuple(best_config.items()),
tuple(peak_config.items()),
best_level_filter,
@ -195,32 +265,51 @@ class TibberPricesPeriodCalculator:
_const.DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH,
)
# Convert flex from percentage to decimal (e.g., 5 -> 0.05)
# CRITICAL: Normalize to absolute value for internal calculations
# User-facing values use sign convention:
# - Best price: positive (e.g., +15% above minimum)
# - Peak price: negative (e.g., -20% below maximum)
# Internal calculations always use positive values with reverse_sort flag
try:
flex = abs(float(flex)) / 100 # Always positive internally
except TypeError, ValueError:
flex = (
abs(_const.DEFAULT_BEST_PRICE_FLEX) / 100
if not reverse_sort
else abs(_const.DEFAULT_PEAK_PRICE_FLEX) / 100
)
default_flex = _const.DEFAULT_PEAK_PRICE_FLEX if reverse_sort else _const.DEFAULT_BEST_PRICE_FLEX
default_min_distance = (
_const.DEFAULT_PEAK_PRICE_MIN_DISTANCE_FROM_AVG
if reverse_sort
else _const.DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG
)
default_min_period_length = (
_const.DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH if reverse_sort else _const.DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH
)
# CRITICAL: Normalize min_distance_from_avg to absolute value
# User-facing values use sign convention:
# - Best price: negative (e.g., -5% below average)
# - Peak price: positive (e.g., +5% above average)
# Internal calculations always use positive values with reverse_sort flag
min_distance_from_avg_normalized = abs(float(min_distance_from_avg))
# Convert flex from percentage to decimal (e.g., 5 -> 0.05)
# and normalize sign conventions to positive internal values.
flex = self._normalize_float_option(
flex,
default_flex,
option_name=_const.CONF_PEAK_PRICE_FLEX if reverse_sort else _const.CONF_BEST_PRICE_FLEX,
absolute=True,
divisor=100,
)
# CRITICAL: Normalize min_distance_from_avg to absolute value.
min_distance_from_avg_normalized = self._normalize_float_option(
min_distance_from_avg,
default_min_distance,
option_name=(
_const.CONF_PEAK_PRICE_MIN_DISTANCE_FROM_AVG
if reverse_sort
else _const.CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG
),
absolute=True,
)
config = {
"flex": flex,
"min_distance_from_avg": min_distance_from_avg_normalized,
"min_period_length": int(min_period_length),
"min_period_length": self._normalize_int_option(
min_period_length,
default_min_period_length,
option_name=(
_const.CONF_PEAK_PRICE_MIN_PERIOD_LENGTH
if reverse_sort
else _const.CONF_BEST_PRICE_MIN_PERIOD_LENGTH
),
minimum=1,
),
}
# Extension settings (stored in 'extension_settings' nested section)
@ -232,12 +321,15 @@ class TibberPricesPeriodCalculator:
_const.DEFAULT_PEAK_PRICE_EXTEND_TO_VERY_EXPENSIVE,
)
)
max_extension_intervals = int(
max_extension_intervals = self._normalize_int_option(
self._get_option(
_const.CONF_PEAK_PRICE_MAX_EXTENSION_INTERVALS,
"extension_settings",
_const.DEFAULT_PEAK_PRICE_MAX_EXTENSION_INTERVALS,
)
),
_const.DEFAULT_PEAK_PRICE_MAX_EXTENSION_INTERVALS,
option_name=_const.CONF_PEAK_PRICE_MAX_EXTENSION_INTERVALS,
minimum=0,
)
else:
extend_to_extreme = bool(
@ -247,12 +339,15 @@ class TibberPricesPeriodCalculator:
_const.DEFAULT_BEST_PRICE_EXTEND_TO_VERY_CHEAP,
)
)
max_extension_intervals = int(
max_extension_intervals = self._normalize_int_option(
self._get_option(
_const.CONF_BEST_PRICE_MAX_EXTENSION_INTERVALS,
"extension_settings",
_const.DEFAULT_BEST_PRICE_MAX_EXTENSION_INTERVALS,
)
),
_const.DEFAULT_BEST_PRICE_MAX_EXTENSION_INTERVALS,
option_name=_const.CONF_BEST_PRICE_MAX_EXTENSION_INTERVALS,
minimum=0,
)
config["extend_to_extreme"] = extend_to_extreme
@ -260,20 +355,26 @@ class TibberPricesPeriodCalculator:
# Geometric flex bonus (intervals inside valley/peak zone get extra flex)
if reverse_sort:
geometric_flex_pct = int(
geometric_flex_pct = self._normalize_int_option(
self._get_option(
_const.CONF_PEAK_PRICE_GEOMETRIC_FLEX,
"extension_settings",
_const.DEFAULT_PEAK_PRICE_GEOMETRIC_FLEX,
)
),
_const.DEFAULT_PEAK_PRICE_GEOMETRIC_FLEX,
option_name=_const.CONF_PEAK_PRICE_GEOMETRIC_FLEX,
minimum=0,
)
else:
geometric_flex_pct = int(
geometric_flex_pct = self._normalize_int_option(
self._get_option(
_const.CONF_BEST_PRICE_GEOMETRIC_FLEX,
"extension_settings",
_const.DEFAULT_BEST_PRICE_GEOMETRIC_FLEX,
)
),
_const.DEFAULT_BEST_PRICE_GEOMETRIC_FLEX,
option_name=_const.CONF_BEST_PRICE_GEOMETRIC_FLEX,
minimum=0,
)
config["geometric_extra_flex"] = geometric_flex_pct / 100
@ -286,12 +387,15 @@ class TibberPricesPeriodCalculator:
_const.DEFAULT_PEAK_PRICE_SEGMENT_FORCING,
)
)
segment_min_periods = int(
segment_min_periods = self._normalize_int_option(
self._get_option(
_const.CONF_PEAK_PRICE_SEGMENT_MIN_PERIODS,
"extension_settings",
_const.DEFAULT_PEAK_PRICE_SEGMENT_MIN_PERIODS,
)
),
_const.DEFAULT_PEAK_PRICE_SEGMENT_MIN_PERIODS,
option_name=_const.CONF_PEAK_PRICE_SEGMENT_MIN_PERIODS,
minimum=1,
)
else:
segment_forcing = bool(
@ -301,12 +405,15 @@ class TibberPricesPeriodCalculator:
_const.DEFAULT_BEST_PRICE_SEGMENT_FORCING,
)
)
segment_min_periods = int(
segment_min_periods = self._normalize_int_option(
self._get_option(
_const.CONF_BEST_PRICE_SEGMENT_MIN_PERIODS,
"extension_settings",
_const.DEFAULT_BEST_PRICE_SEGMENT_MIN_PERIODS,
)
),
_const.DEFAULT_BEST_PRICE_SEGMENT_MIN_PERIODS,
option_name=_const.CONF_BEST_PRICE_SEGMENT_MIN_PERIODS,
minimum=1,
)
config["segment_forcing"] = segment_forcing
config["segment_min_periods"] = segment_min_periods
@ -318,7 +425,7 @@ class TibberPricesPeriodCalculator:
def should_show_periods(
self,
price_info: dict[str, Any],
price_info: list[dict[str, Any]],
*,
reverse_sort: bool,
level_override: str | None = None,
@ -327,7 +434,7 @@ class TibberPricesPeriodCalculator:
Check if periods should be shown based on level filter only.
Args:
price_info: Price information dict with today/yesterday/tomorrow data
price_info: Flat list of price intervals (yesterday/today/tomorrow)
reverse_sort: If False (best_price), checks max_level filter.
If True (peak_price), checks min_level filter.
level_override: Optional override for level filter ("any" to disable)
@ -486,16 +593,27 @@ class TibberPricesPeriodCalculator:
# Normal check failed - try splitting at gap clusters as fallback
# Get minimum period length from config (convert minutes to intervals)
period_settings = self.config_entry.options.get("period_settings", {})
if reverse_sort:
min_period_minutes = period_settings.get(
_const.CONF_PEAK_PRICE_MIN_PERIOD_LENGTH,
min_period_minutes = self._normalize_int_option(
self._get_option(
_const.CONF_PEAK_PRICE_MIN_PERIOD_LENGTH,
"period_settings",
_const.DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH,
),
_const.DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH,
option_name=_const.CONF_PEAK_PRICE_MIN_PERIOD_LENGTH,
minimum=1,
)
else:
min_period_minutes = period_settings.get(
_const.CONF_BEST_PRICE_MIN_PERIOD_LENGTH,
min_period_minutes = self._normalize_int_option(
self._get_option(
_const.CONF_BEST_PRICE_MIN_PERIOD_LENGTH,
"period_settings",
_const.DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH,
),
_const.DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH,
option_name=_const.CONF_BEST_PRICE_MIN_PERIOD_LENGTH,
minimum=1,
)
min_period_intervals = self.time.minutes_to_intervals(min_period_minutes)
@ -590,7 +708,7 @@ class TibberPricesPeriodCalculator:
def check_level_filter(
self,
price_info: dict[str, Any],
price_info: list[dict[str, Any]],
*,
reverse_sort: bool,
override: str | None = None,
@ -602,7 +720,7 @@ class TibberPricesPeriodCalculator:
to deviate by one level step (e.g., CHEAP allows NORMAL, but not EXPENSIVE).
Args:
price_info: Price information dict with today data
price_info: Flat list of price intervals used for today's level check
reverse_sort: If False (best_price), checks max_level (upper bound filter).
If True (peak_price), checks min_level (lower bound filter).
override: Optional override value (e.g., "any" to disable filter)
@ -644,16 +762,27 @@ class TibberPricesPeriodCalculator:
return True # If no data, don't filter
# Get gap tolerance configuration
period_settings = self.config_entry.options.get("period_settings", {})
if reverse_sort:
max_gap_count = period_settings.get(
_const.CONF_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
max_gap_count = self._normalize_int_option(
self._get_option(
_const.CONF_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
"period_settings",
_const.DEFAULT_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
),
_const.DEFAULT_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
option_name=_const.CONF_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
minimum=0,
)
else:
max_gap_count = period_settings.get(
_const.CONF_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
max_gap_count = self._normalize_int_option(
self._get_option(
_const.CONF_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
"period_settings",
_const.DEFAULT_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
),
_const.DEFAULT_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
option_name=_const.CONF_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
minimum=0,
)
# Note: level_config is lowercase from selector, but _const.PRICE_LEVEL_MAPPING uses uppercase
@ -683,7 +812,7 @@ class TibberPricesPeriodCalculator:
def calculate_periods_for_price_info(
self,
price_info: dict[str, Any],
price_info: list[dict[str, Any]],
day_patterns: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
@ -724,28 +853,48 @@ class TibberPricesPeriodCalculator:
# Get rating thresholds from config (flat in options, not in sections)
# CRITICAL: Price rating thresholds are stored FLAT in options (no sections)
threshold_low = self.config_entry.options.get(
_const.CONF_PRICE_RATING_THRESHOLD_LOW,
threshold_low = self._normalize_float_option(
self.config_entry.options.get(
_const.CONF_PRICE_RATING_THRESHOLD_LOW,
_const.DEFAULT_PRICE_RATING_THRESHOLD_LOW,
),
_const.DEFAULT_PRICE_RATING_THRESHOLD_LOW,
option_name=_const.CONF_PRICE_RATING_THRESHOLD_LOW,
)
threshold_high = self.config_entry.options.get(
_const.CONF_PRICE_RATING_THRESHOLD_HIGH,
threshold_high = self._normalize_float_option(
self.config_entry.options.get(
_const.CONF_PRICE_RATING_THRESHOLD_HIGH,
_const.DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
),
_const.DEFAULT_PRICE_RATING_THRESHOLD_HIGH,
option_name=_const.CONF_PRICE_RATING_THRESHOLD_HIGH,
)
# Get volatility thresholds from config (flat in options, not in sections)
# CRITICAL: Volatility thresholds are stored FLAT in options (no sections)
threshold_volatility_moderate = self.config_entry.options.get(
_const.CONF_VOLATILITY_THRESHOLD_MODERATE,
threshold_volatility_moderate = self._normalize_float_option(
self.config_entry.options.get(
_const.CONF_VOLATILITY_THRESHOLD_MODERATE,
_const.DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
),
_const.DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
option_name=_const.CONF_VOLATILITY_THRESHOLD_MODERATE,
)
threshold_volatility_high = self.config_entry.options.get(
_const.CONF_VOLATILITY_THRESHOLD_HIGH,
threshold_volatility_high = self._normalize_float_option(
self.config_entry.options.get(
_const.CONF_VOLATILITY_THRESHOLD_HIGH,
_const.DEFAULT_VOLATILITY_THRESHOLD_HIGH,
),
_const.DEFAULT_VOLATILITY_THRESHOLD_HIGH,
option_name=_const.CONF_VOLATILITY_THRESHOLD_HIGH,
)
threshold_volatility_very_high = self.config_entry.options.get(
_const.CONF_VOLATILITY_THRESHOLD_VERY_HIGH,
threshold_volatility_very_high = self._normalize_float_option(
self.config_entry.options.get(
_const.CONF_VOLATILITY_THRESHOLD_VERY_HIGH,
_const.DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
),
_const.DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
option_name=_const.CONF_VOLATILITY_THRESHOLD_VERY_HIGH,
)
# Get relaxation configuration for best price
@ -764,15 +913,25 @@ class TibberPricesPeriodCalculator:
show_best_price = bool(all_prices)
else:
show_best_price = self.should_show_periods(price_info, reverse_sort=False) if all_prices else False
min_periods_best = self._get_option(
_const.CONF_MIN_PERIODS_BEST,
"relaxation_and_target_periods",
min_periods_best = self._normalize_int_option(
self._get_option(
_const.CONF_MIN_PERIODS_BEST,
"relaxation_and_target_periods",
_const.DEFAULT_MIN_PERIODS_BEST,
),
_const.DEFAULT_MIN_PERIODS_BEST,
option_name=_const.CONF_MIN_PERIODS_BEST,
minimum=1,
)
relaxation_attempts_best = self._get_option(
_const.CONF_RELAXATION_ATTEMPTS_BEST,
"relaxation_and_target_periods",
relaxation_attempts_best = self._normalize_int_option(
self._get_option(
_const.CONF_RELAXATION_ATTEMPTS_BEST,
"relaxation_and_target_periods",
_const.DEFAULT_RELAXATION_ATTEMPTS_BEST,
),
_const.DEFAULT_RELAXATION_ATTEMPTS_BEST,
option_name=_const.CONF_RELAXATION_ATTEMPTS_BEST,
minimum=1,
)
# Calculate best price periods (or return empty if filtered)
@ -785,10 +944,15 @@ class TibberPricesPeriodCalculator:
"period_settings",
_const.DEFAULT_BEST_PRICE_MAX_LEVEL,
)
gap_count_best = self._get_option(
_const.CONF_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
"period_settings",
gap_count_best = self._normalize_int_option(
self._get_option(
_const.CONF_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
"period_settings",
_const.DEFAULT_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
),
_const.DEFAULT_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
option_name=_const.CONF_BEST_PRICE_MAX_LEVEL_GAP_COUNT,
minimum=0,
)
best_period_config = TibberPricesPeriodConfig(
reverse_sort=False,
@ -851,15 +1015,25 @@ class TibberPricesPeriodCalculator:
show_peak_price = bool(all_prices)
else:
show_peak_price = self.should_show_periods(price_info, reverse_sort=True) if all_prices else False
min_periods_peak = self._get_option(
_const.CONF_MIN_PERIODS_PEAK,
"relaxation_and_target_periods",
min_periods_peak = self._normalize_int_option(
self._get_option(
_const.CONF_MIN_PERIODS_PEAK,
"relaxation_and_target_periods",
_const.DEFAULT_MIN_PERIODS_PEAK,
),
_const.DEFAULT_MIN_PERIODS_PEAK,
option_name=_const.CONF_MIN_PERIODS_PEAK,
minimum=1,
)
relaxation_attempts_peak = self._get_option(
_const.CONF_RELAXATION_ATTEMPTS_PEAK,
"relaxation_and_target_periods",
relaxation_attempts_peak = self._normalize_int_option(
self._get_option(
_const.CONF_RELAXATION_ATTEMPTS_PEAK,
"relaxation_and_target_periods",
_const.DEFAULT_RELAXATION_ATTEMPTS_PEAK,
),
_const.DEFAULT_RELAXATION_ATTEMPTS_PEAK,
option_name=_const.CONF_RELAXATION_ATTEMPTS_PEAK,
minimum=1,
)
# Calculate peak price periods (or return empty if filtered)
@ -872,10 +1046,15 @@ class TibberPricesPeriodCalculator:
"period_settings",
_const.DEFAULT_PEAK_PRICE_MIN_LEVEL,
)
gap_count_peak = self._get_option(
_const.CONF_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
"period_settings",
gap_count_peak = self._normalize_int_option(
self._get_option(
_const.CONF_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
"period_settings",
_const.DEFAULT_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
),
_const.DEFAULT_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
option_name=_const.CONF_PEAK_PRICE_MAX_LEVEL_GAP_COUNT,
minimum=0,
)
peak_period_config = TibberPricesPeriodConfig(
reverse_sort=True,

View file

@ -1,15 +1,17 @@
#!/bin/bash
# script/check-all: Run full checks for Python and non-Python files
# script/check-all: Run full checks for Python/non-Python files, optionally incl. test Pyright
#
# Runs project checks and validates formatting/lint state for common
# non-Python files.
# non-Python files. Optionally includes Pyright checks for tests.
#
# Usage:
# ./scripts/check-all
# ./scripts/check-all --with-test-types
#
# Examples:
# ./scripts/check-all
# ./scripts/check-all --with-test-types
set -euo pipefail
@ -19,6 +21,21 @@ cd "$SCRIPT_DIR/.."
# shellcheck source=scripts/.lib/output.sh
source "$SCRIPT_DIR/.lib/output.sh"
run_test_type_check=false
for arg in "$@"; do
case "$arg" in
--with-test-types)
run_test_type_check=true
;;
*)
log_error "Unknown argument: $arg"
log_info "Usage: ./scripts/check-all [--with-test-types]"
exit 1
;;
esac
done
collect_shell_files() {
local files=()
local file shebang
@ -40,6 +57,11 @@ collect_shell_files() {
log_header "Running Python checks"
"$SCRIPT_DIR/check"
if [[ $run_test_type_check == true ]]; then
log_header "Running Pyright checks for tests"
"$SCRIPT_DIR/type-check-tests"
fi
log_header "Checking JSON/JSONC/Markdown with Prettier"
npx --yes prettier --check "**/*.{json,jsonc,md,yml,yaml}"

34
scripts/type-check-tests Executable file
View file

@ -0,0 +1,34 @@
#!/bin/bash
# script/type-check-tests: Run optional Pyright checks for test files
#
# Runs Pyright on tests without changing the main repository type-check scope.
# Defaults to the full tests/ tree, but accepts optional file or folder targets.
#
# Usage:
# ./scripts/type-check-tests
# ./scripts/type-check-tests tests/test_period_overlap.py tests/test_periods_hash.py
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
cd "$SCRIPT_DIR/.."
# shellcheck source=scripts/.lib/output.sh
source "$SCRIPT_DIR/.lib/output.sh"
if [[ -z ${VIRTUAL_ENV:-} ]]; then
# shellcheck source=/dev/null
source "$HOME/.venv/bin/activate"
fi
targets=("$@")
if [[ ${#targets[@]} -eq 0 ]]; then
targets=("tests")
fi
log_header "Running type checking tools for test files"
pyright "${targets[@]}"
log_success "Test type checking completed"

View file

@ -3,6 +3,7 @@
from __future__ import annotations
from datetime import UTC, datetime, timedelta
from zoneinfo import ZoneInfo
from custom_components.tibber_prices.services.charging.deadline_solver import resolve_deadline
from custom_components.tibber_prices.services.charging.power_scheduler import build_power_schedule
@ -53,6 +54,7 @@ def test_stepped_mode_uses_smallest_sufficient_step() -> None:
def test_resolve_deadline_next_peak_period() -> None:
"""Deadline helper should resolve the next future peak period start."""
now = datetime(2026, 1, 1, 0, 0, tzinfo=UTC)
home_tz = ZoneInfo("UTC")
coordinator_data = {
"pricePeriods": {
"peak_price": {
@ -69,7 +71,7 @@ def test_resolve_deadline_next_peak_period() -> None:
deadline, source = resolve_deadline(
coordinator_data=coordinator_data,
now=now,
home_tz=UTC,
home_tz=home_tz,
must_reach_by_event="next_peak_period",
)

View file

@ -0,0 +1,71 @@
"""Regression tests for visible best/peak period binary sensor attributes."""
from __future__ import annotations
from unittest.mock import Mock
import pytest
from custom_components.tibber_prices.binary_sensor.attributes import build_final_attributes_simple
from custom_components.tibber_prices.const import CONF_CURRENCY_DISPLAY_MODE, DISPLAY_MODE_BASE
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
from homeassistant.util import dt as dt_util
def _dt(value: str):
"""Parse a timezone-aware datetime string for tests."""
parsed = dt_util.parse_datetime(value)
assert parsed is not None
return parsed
@pytest.mark.unit
def test_build_final_attributes_exposes_day_statistics_on_current_period() -> None:
"""Day-level period context should be exposed on the visible binary sensor attrs."""
current_period = {
"start": _dt("2025-11-22T12:00:00+01:00"),
"end": _dt("2025-11-22T13:00:00+01:00"),
"duration_minutes": 60,
"level": "CHEAP",
"rating_level": "LOW",
"rating_difference_%": -15.0,
"price_mean": -0.1,
"price_median": -0.12,
"price_min": -0.3,
"price_max": 0.1,
"price_spread": 0.4,
"price_coefficient_variation_%": 12.3,
"volatility": "moderate",
"period_price_diff_from_daily_min": 0.2,
"period_price_diff_from_daily_min_%": 200.0,
"day_volatility_%": 400.0,
"day_price_min": -30.0,
"day_price_max": 10.0,
"day_price_span": 40.0,
"period_interval_count": 4,
"period_position": 1,
"period_count_total": 1,
"period_count_remaining": 0,
}
time = TibberPricesTimeService(reference_time=_dt("2025-11-22T12:00:00+01:00"))
config_entry = Mock(options={CONF_CURRENCY_DISPLAY_MODE: DISPLAY_MODE_BASE})
attributes = build_final_attributes_simple(
current_period,
[current_period],
time=time,
config_entry=config_entry,
)
assert attributes["price_mean"] == -0.1
assert attributes["period_price_diff_from_daily_min"] == 0.2
assert attributes["day_volatility_%"] == 400.0
assert attributes["day_price_min"] == -30.0
assert attributes["day_price_max"] == 10.0
assert attributes["day_price_span"] == 40.0
nested_period = attributes["periods"][0]
assert nested_period["day_volatility_%"] == 400.0
assert nested_period["day_price_min"] == -30.0
assert nested_period["day_price_max"] == 10.0
assert nested_period["day_price_span"] == 40.0

View file

@ -47,12 +47,15 @@ def _create_intervals(start: datetime, count: int) -> list[dict]:
return [_create_test_interval(start + timedelta(minutes=15 * i)) for i in range(count)]
def _create_pool(api_client: MagicMock) -> TibberPricesIntervalPool:
"""Create an interval pool using the current constructor signature."""
return TibberPricesIntervalPool(home_id="home123", api=api_client)
@pytest.mark.asyncio
@pytest.mark.unit
async def test_no_cache_single_api_call() -> None:
"""Test: Empty cache → 1 API call for entire range."""
pool = TibberPricesIntervalPool(home_id="home123")
# Mock API client
api_client = MagicMock(
spec=[
@ -63,6 +66,7 @@ async def test_no_cache_single_api_call() -> None:
"_calculate_day_before_yesterday_midnight",
]
)
pool = _create_pool(api_client)
start = dt_util.now().replace(hour=10, minute=0, second=0, microsecond=0)
end = start + timedelta(hours=2) # 8 intervals
@ -80,19 +84,17 @@ async def test_no_cache_single_api_call() -> None:
user_data = {"timeZone": "Europe/Berlin"}
# Act
result = await pool.get_intervals(api_client, user_data, start, end)
intervals, _api_called = await pool.get_intervals(api_client, user_data, start, end)
# Assert: Exactly 1 API call
assert api_client.async_get_price_info_for_range.call_count == 1
assert len(result) == 8
assert len(intervals) == 8
@pytest.mark.asyncio
@pytest.mark.unit
async def test_full_cache_zero_api_calls() -> None:
"""Test: Fully cached range → 0 API calls."""
pool = TibberPricesIntervalPool(home_id="home123")
# Mock API client
api_client = MagicMock(
spec=[
@ -103,6 +105,7 @@ async def test_full_cache_zero_api_calls() -> None:
"_calculate_day_before_yesterday_midnight",
]
)
pool = _create_pool(api_client)
start = dt_util.now().replace(hour=10, minute=0, second=0, microsecond=0)
end = start + timedelta(hours=2) # 8 intervals
@ -123,19 +126,17 @@ async def test_full_cache_zero_api_calls() -> None:
assert api_client.async_get_price_info_for_range.call_count == 1
# Second call: should use cache
result = await pool.get_intervals(api_client, user_data, start, end)
intervals, _api_called = await pool.get_intervals(api_client, user_data, start, end)
# Assert: Still only 1 API call (from first request)
assert api_client.async_get_price_info_for_range.call_count == 1
assert len(result) == 8
assert len(intervals) == 8
@pytest.mark.asyncio
@pytest.mark.unit
async def test_single_gap_single_api_call() -> None:
"""Test: One gap in cache → 1 API call for that gap only."""
pool = TibberPricesIntervalPool(home_id="home123")
# Mock API client
api_client = MagicMock(
spec=[
@ -146,6 +147,7 @@ async def test_single_gap_single_api_call() -> None:
"_calculate_day_before_yesterday_midnight",
]
)
pool = _create_pool(api_client)
start = dt_util.now().replace(hour=10, minute=0, second=0, microsecond=0)
end = start + timedelta(hours=3) # 12 intervals total
@ -175,19 +177,17 @@ async def test_single_gap_single_api_call() -> None:
gap_intervals = _create_intervals(start + timedelta(hours=1), 4)
api_client.async_get_price_info_for_range = AsyncMock(return_value=gap_intervals)
result = await pool.get_intervals(api_client, user_data, start, end)
intervals, _api_called = await pool.get_intervals(api_client, user_data, start, end)
# Assert: Exactly 1 additional API call (for the gap)
assert api_client.async_get_price_info_for_range.call_count == call_count_before + 1
assert len(result) == 12 # All intervals now available
assert len(intervals) == 12 # All intervals now available
@pytest.mark.asyncio
@pytest.mark.unit
async def test_multiple_gaps_multiple_api_calls() -> None:
"""Test: Multiple gaps → one API call per continuous gap."""
pool = TibberPricesIntervalPool(home_id="home123")
# Mock API client
api_client = MagicMock(
spec=[
@ -198,6 +198,7 @@ async def test_multiple_gaps_multiple_api_calls() -> None:
"_calculate_day_before_yesterday_midnight",
]
)
pool = _create_pool(api_client)
start = dt_util.now().replace(hour=10, minute=0, second=0, microsecond=0)
end = start + timedelta(hours=4) # 16 intervals total
@ -247,19 +248,17 @@ async def test_multiple_gaps_multiple_api_calls() -> None:
api_client.async_get_price_info_for_range = AsyncMock(side_effect=mock_fetch)
result = await pool.get_intervals(api_client, user_data, start, end)
intervals, _api_called = await pool.get_intervals(api_client, user_data, start, end)
# Assert: Exactly 3 additional API calls (one per gap)
assert api_client.async_get_price_info_for_range.call_count == call_count_before + 3
assert len(result) == 16 # All intervals now available
assert len(intervals) == 16 # All intervals now available
@pytest.mark.asyncio
@pytest.mark.unit
async def test_partial_overlap_minimal_fetch() -> None:
"""Test: Overlapping request → fetch only new intervals."""
pool = TibberPricesIntervalPool(home_id="home123")
# Mock API client
api_client = MagicMock(
spec=[
@ -270,6 +269,7 @@ async def test_partial_overlap_minimal_fetch() -> None:
"_calculate_day_before_yesterday_midnight",
]
)
pool = _create_pool(api_client)
start = dt_util.now().replace(hour=10, minute=0, second=0, microsecond=0)
user_data = {"timeZone": "Europe/Berlin"}
@ -285,7 +285,7 @@ async def test_partial_overlap_minimal_fetch() -> None:
batch2 = _create_intervals(start + timedelta(hours=2), 4) # Only new ones
api_client.async_get_price_info_for_range = AsyncMock(return_value=batch2)
result = await pool.get_intervals(
intervals, _api_called = await pool.get_intervals(
api_client,
user_data,
start + timedelta(hours=1),
@ -294,15 +294,13 @@ async def test_partial_overlap_minimal_fetch() -> None:
# Assert: 1 additional API call (for 12:00-13:00 only)
assert api_client.async_get_price_info_for_range.call_count == 2
assert len(result) == 8 # 11:00-13:00
assert len(intervals) == 8 # 11:00-13:00
@pytest.mark.asyncio
@pytest.mark.unit
async def test_detect_missing_ranges_optimization() -> None:
"""Test: Gap detection returns minimal set of ranges (tested via API behavior)."""
pool = TibberPricesIntervalPool(home_id="home123")
# Mock API client that tracks calls
api_client = MagicMock(
spec=[
@ -313,6 +311,7 @@ async def test_detect_missing_ranges_optimization() -> None:
"_calculate_day_before_yesterday_midnight",
]
)
pool = _create_pool(api_client)
start = dt_util.now().replace(hour=10, minute=0, second=0, microsecond=0)
end = start + timedelta(hours=4)
@ -334,13 +333,17 @@ async def test_detect_missing_ranges_optimization() -> None:
# Manually add to cache (simulate previous fetches)
# Note: Accessing private _cache for test setup
# Single-home architecture: directly populate internal structures
pool._fetch_groups = [ # noqa: SLF001
{
"intervals": cached,
"fetch_time": dt_util.now().isoformat(),
}
]
pool._timestamp_index = {interval["startsAt"]: idx for idx, interval in enumerate(cached)} # noqa: SLF001
cache = pool._cache # noqa: SLF001
index = pool._index # noqa: SLF001
cache.set_fetch_groups(
[
{
"intervals": cached,
"fetched_at": dt_util.now(),
}
]
)
index.rebuild(cache.get_fetch_groups())
# Mock responses for the 3 expected gaps
gap1 = _create_intervals(start + timedelta(minutes=30), 2) # 10:30-11:00
@ -362,10 +365,10 @@ async def test_detect_missing_ranges_optimization() -> None:
api_client.async_get_price_info_for_range = AsyncMock(side_effect=mock_fetch)
# Request entire range - should detect exactly 3 gaps
result = await pool.get_intervals(api_client, user_data, start, end)
intervals, _api_called = await pool.get_intervals(api_client, user_data, start, end)
# Assert: Exactly 3 API calls (one per gap)
assert api_client.async_get_price_info_for_range.call_count == 3
# Verify all intervals are now available
assert len(result) == 16 # 2 + 2 + 2 + 2 + 1 + 7 = 16 intervals
assert len(intervals) == 16 # 2 + 2 + 2 + 2 + 1 + 7 = 16 intervals

View file

@ -679,3 +679,100 @@ class TestRealWorldScenarios:
assert in_flex_min is True, "Negative minimum should pass"
assert in_flex_within is True, "Within flex of negative min should pass"
@pytest.mark.unit
class TestNegativePriceFastPath:
"""
Tests for the negative/zero price fast path.
Regression Tests:
- Negative prices not fully included in best price (Apr 2026): Even with
VERY_CHEAP extension and max flex (50%), the old formula excluded large
parts of a negative-price day because the flex band was anchored at the
negative daily minimum. Example: min=-38 ct, flex=50% threshold=-19 ct,
excluding -18 ct, -10 ct, 0 ct, etc.
"""
def test_negative_price_always_qualifies_best_price(self) -> None:
"""Any negative price unconditionally qualifies as best price (fast path)."""
# Simulate tomorrow: min=-38 ct, avg=-3 ct
criteria = TibberPricesIntervalCriteria(
ref_price=-38.0,
avg_price=-3.0,
flex=0.15,
min_distance_from_avg=5.0,
reverse_sort=False,
)
for price in [-38.0, -20.0, -5.0, -0.01]:
in_flex, meets_distance = check_interval_criteria(price, criteria)
assert in_flex is True, f"Negative price {price} ct should always pass flex"
assert meets_distance is True, f"Negative price {price} ct should always pass distance"
def test_zero_price_always_qualifies_best_price(self) -> None:
"""Zero price unconditionally qualifies as best price (fast path)."""
criteria = TibberPricesIntervalCriteria(
ref_price=-10.0,
avg_price=5.0,
flex=0.15,
min_distance_from_avg=5.0,
reverse_sort=False,
)
in_flex, meets_distance = check_interval_criteria(0.0, criteria)
assert in_flex is True, "Zero price should always pass flex"
assert meets_distance is True, "Zero price should always pass distance"
def test_negative_price_does_not_qualify_peak_price(self) -> None:
"""Negative prices should NOT qualify as peak prices."""
# Even with high average, a negative price is never a "peak"
criteria = TibberPricesIntervalCriteria(
ref_price=30.0,
avg_price=20.0,
flex=0.50,
min_distance_from_avg=5.0,
reverse_sort=True,
)
in_flex, _meets_distance = check_interval_criteria(-5.0, criteria)
assert in_flex is False, "Negative price should fail peak price flex check"
def test_positive_price_is_not_auto_qualified_on_negative_day(self) -> None:
"""Positive prices must still pass the normal interval criteria."""
criteria = TibberPricesIntervalCriteria(
ref_price=-38.0,
avg_price=5.0,
flex=0.15,
min_distance_from_avg=5.0,
reverse_sort=False,
)
in_flex, meets_distance = check_interval_criteria(2.0, criteria)
assert in_flex is False, "Positive prices should not get a day-global negative halo"
assert meets_distance is True, "Distance may still pass even when flex fails"
def test_regression_old_formula_would_exclude_most_intervals(self) -> None:
"""
Regression: the old flex formula (anchored at negative min) excluded
almost all intervals on a day with extreme negative prices.
Old formula (BROKEN):
flex_base = max(|avg - min|, |min|) = max(35, 38) = 38
threshold = -38 + 38 * 0.15 = -32.3 ct
only prices -32.3 ct qualify!
New formula:
Fast path: price 0 always (True, True)
Zero-anchored: threshold = abs(-3) * 0.15 = 0.45 ct above zero
ALL negative prices qualify, plus tiny halo near 0
"""
criteria = TibberPricesIntervalCriteria(
ref_price=-38.0,
avg_price=-3.0,
flex=0.15,
min_distance_from_avg=5.0,
reverse_sort=False,
)
# All these prices should pass with new logic but would have FAILED with old logic
prices_that_now_pass = [-32.0, -20.0, -10.0, -5.0, -1.0, -0.01, 0.0]
for price in prices_that_now_pass:
in_flex, meets_distance = check_interval_criteria(price, criteria)
assert in_flex is True, f"Price {price} ct should pass with new logic (regression)"
assert meets_distance is True, f"Price {price} ct should pass distance (regression)"

View file

@ -0,0 +1,171 @@
"""Regression tests for best-price periods with negative prices."""
from __future__ import annotations
from datetime import datetime
from zoneinfo import ZoneInfo
import pytest
from custom_components.tibber_prices.coordinator.period_handlers.core import calculate_periods
from custom_components.tibber_prices.coordinator.period_handlers.types import TibberPricesPeriodConfig
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
def _create_interval(dt: datetime, price: float, level: str) -> dict:
"""Create a single interval dict."""
rating = "LOW" if price <= 5.0 else "NORMAL"
return {
"startsAt": dt,
"total": price,
"level": level,
"rating_level": rating,
"_original_price": price,
}
def _build_day_with_overrides(overrides: dict[tuple[int, int], tuple[float, str]]) -> list[dict]:
"""Build a day of 15-minute intervals with targeted overrides."""
tz = ZoneInfo("Europe/Berlin")
base = datetime(2025, 4, 25, 0, 0, 0, tzinfo=tz)
intervals: list[dict] = []
for hour in range(24):
for minute in (0, 15, 30, 45):
price, level = overrides.get((hour, minute), (20.0, "NORMAL"))
intervals.append(_create_interval(base.replace(hour=hour, minute=minute), price, level))
return intervals
def _create_time_service() -> TibberPricesTimeService:
"""Create a deterministic time service for period calculation."""
tz = ZoneInfo("Europe/Berlin")
return TibberPricesTimeService(datetime(2025, 4, 25, 12, 0, 0, tzinfo=tz))
def _create_day_pattern(valley_start: tuple[int, int], valley_end: tuple[int, int]) -> dict:
"""Create a minimal day-pattern dict for geometric flex tests."""
tz = ZoneInfo("Europe/Berlin")
base = datetime(2025, 4, 25, 0, 0, 0, tzinfo=tz)
return {
"pattern": "valley",
"confidence": 1.0,
"day_cv_percent": 100.0,
"segments": [],
"extreme_time": base.replace(hour=13, minute=30),
"valley_start": base.replace(hour=valley_start[0], minute=valley_start[1]),
"valley_end": base.replace(hour=valley_end[0], minute=valley_end[1]),
"peak_start": None,
"peak_end": None,
}
@pytest.mark.unit
class TestNegativeBestPricePeriods:
"""Validate local shoulder rescue around short negative cores only."""
def test_short_negative_dip_can_be_rescued_by_local_shoulders(self) -> None:
"""A short negative core may extend into directly adjacent cheap shoulders."""
intervals = _build_day_with_overrides(
{
(10, 45): (5.0, "CHEAP"),
(11, 0): (2.0, "CHEAP"),
(11, 15): (-1.5, "VERY_CHEAP"),
(11, 30): (-1.0, "VERY_CHEAP"),
(11, 45): (2.0, "CHEAP"),
(12, 0): (5.0, "CHEAP"),
}
)
config = TibberPricesPeriodConfig(
reverse_sort=False,
flex=0.15,
min_distance_from_avg=5.0,
min_period_length=60,
)
result = calculate_periods(intervals, config=config, time=_create_time_service())
periods = result["periods"]
assert len(periods) == 1, "Expected the short negative dip to survive as one local period"
assert periods[0]["start"].hour == 11 and periods[0]["start"].minute == 0
assert periods[0]["end"].hour == 12 and periods[0]["end"].minute == 0
assert periods[0]["duration_minutes"] == 60
def test_long_negative_block_stays_negative_only(self) -> None:
"""A multi-hour negative block must not pull in positive shoulders."""
intervals = _build_day_with_overrides(
{
(10, 30): (2.0, "CHEAP"),
(10, 45): (2.0, "CHEAP"),
(11, 0): (-1.5, "VERY_CHEAP"),
(11, 15): (-1.4, "VERY_CHEAP"),
(11, 30): (-1.3, "VERY_CHEAP"),
(11, 45): (-1.2, "VERY_CHEAP"),
(12, 0): (-1.1, "VERY_CHEAP"),
(12, 15): (-1.0, "VERY_CHEAP"),
(12, 30): (-0.9, "VERY_CHEAP"),
(12, 45): (-0.8, "VERY_CHEAP"),
(13, 0): (2.0, "CHEAP"),
(13, 15): (2.0, "CHEAP"),
}
)
config = TibberPricesPeriodConfig(
reverse_sort=False,
flex=0.15,
min_distance_from_avg=5.0,
min_period_length=180,
)
result = calculate_periods(intervals, config=config, time=_create_time_service())
assert result["periods"] == [], "Long negative blocks should not be widened with positive shoulders"
def test_negative_core_ignores_geometric_and_shape_extension(self) -> None:
"""Negative best-price periods must not widen via geometric or shape extension."""
intervals = _build_day_with_overrides(
{
(11, 45): (7.93, "VERY_CHEAP"),
(12, 0): (4.5, "VERY_CHEAP"),
(12, 15): (-1.0, "VERY_CHEAP"),
(12, 30): (-2.0, "VERY_CHEAP"),
(12, 45): (-3.0, "VERY_CHEAP"),
(13, 0): (-4.0, "VERY_CHEAP"),
(13, 15): (-5.36, "VERY_CHEAP"),
(13, 30): (-4.5, "VERY_CHEAP"),
(13, 45): (-3.5, "VERY_CHEAP"),
(14, 0): (-2.5, "VERY_CHEAP"),
(14, 15): (-1.5, "VERY_CHEAP"),
(14, 30): (-0.5, "VERY_CHEAP"),
(14, 45): (2.0, "VERY_CHEAP"),
(15, 0): (4.0, "VERY_CHEAP"),
(15, 15): (7.0, "VERY_CHEAP"),
}
)
config = TibberPricesPeriodConfig(
reverse_sort=False,
flex=0.15,
min_distance_from_avg=5.0,
min_period_length=60,
extend_to_extreme=True,
max_extension_intervals=4,
geometric_extra_flex=0.20,
)
day_patterns_by_date = {
datetime(2025, 4, 25, 0, 0, 0, tzinfo=ZoneInfo("Europe/Berlin")).date(): _create_day_pattern(
(11, 45), (15, 15)
)
}
result = calculate_periods(
intervals,
config=config,
time=_create_time_service(),
day_patterns_by_date=day_patterns_by_date,
)
assert len(result["periods"]) == 1
assert result["periods"][0]["start"].hour == 12 and result["periods"][0]["start"].minute == 15
assert result["periods"][0]["end"].hour == 14 and result["periods"][0]["end"].minute == 45
assert result["periods"][0].get("geometric_extension_active") is None
assert result["periods"][0].get("extension_intervals_added") is None

View file

@ -0,0 +1,134 @@
"""Regression tests for overlap resolution and merged period summaries."""
from __future__ import annotations
from datetime import timedelta
import pytest
from custom_components.tibber_prices.coordinator.period_handlers import TibberPricesPeriodConfig
from custom_components.tibber_prices.coordinator.period_handlers.period_overlap import resolve_period_overlaps
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
from custom_components.tibber_prices.utils.price import calculate_coefficient_of_variation
from homeassistant.util import dt as dt_util
def _create_interval(base_time, offset: int, price: float, level: str, difference: float, rating: str) -> dict:
"""Create one quarter-hour interval for overlap tests."""
return {
"startsAt": base_time + timedelta(minutes=offset * 15),
"total": price,
"level": level,
"difference": difference,
"rating_level": rating,
}
@pytest.mark.unit
class TestResolvePeriodOverlaps:
"""Validate merged period summaries stay consistent after overlap resolution."""
def test_merge_recomputes_summary_from_raw_intervals(self) -> None:
"""Overlapping periods should be rebuilt from the raw union, not glued summaries."""
base_time = dt_util.parse_datetime("2025-11-22T10:00:00+01:00")
assert base_time is not None
all_prices = [
_create_interval(base_time, 0, 0.10, "CHEAP", -12.0, "LOW"),
_create_interval(base_time, 1, 0.10, "CHEAP", -11.0, "LOW"),
_create_interval(base_time, 2, 0.11, "CHEAP", -10.0, "LOW"),
_create_interval(base_time, 3, 0.11, "CHEAP", -9.0, "NORMAL"),
_create_interval(base_time, 4, 0.12, "NORMAL", -4.0, "NORMAL"),
_create_interval(base_time, 5, 0.13, "NORMAL", 0.0, "NORMAL"),
_create_interval(base_time, 6, 0.13, "NORMAL", 1.0, "NORMAL"),
_create_interval(base_time, 7, 0.14, "NORMAL", 3.0, "NORMAL"),
]
config = TibberPricesPeriodConfig(
reverse_sort=False,
flex=0.15,
min_distance_from_avg=0.0,
min_period_length=60,
threshold_low=-10.0,
threshold_high=10.0,
)
time = TibberPricesTimeService(reference_time=base_time + timedelta(hours=1))
existing_period = {
"start": base_time,
"end": base_time + timedelta(minutes=75),
"duration_minutes": 75,
"level": "cheap",
"rating_level": "low",
"rating_difference_%": -9.2,
"price_mean": 0.108,
"price_median": 0.11,
"price_min": 0.10,
"price_max": 0.12,
"price_spread": 0.02,
"price_coefficient_variation_%": 7.7,
"volatility": "low",
"period_interval_count": 5,
"period_price_diff_from_daily_min": 0.008,
"period_price_diff_from_daily_min_%": 8.0,
"period_position": 1,
"period_count_total": 1,
"period_count_remaining": 0,
}
relaxed_period = {
"start": base_time + timedelta(minutes=60),
"end": base_time + timedelta(minutes=120),
"duration_minutes": 60,
"level": "normal",
"rating_level": "normal",
"rating_difference_%": 0.0,
"price_mean": 0.13,
"price_median": 0.13,
"price_min": 0.12,
"price_max": 0.14,
"price_spread": 0.02,
"price_coefficient_variation_%": 5.8,
"volatility": "low",
"period_interval_count": 4,
"period_price_diff_from_daily_min": 0.03,
"period_price_diff_from_daily_min_%": 30.0,
"period_position": 1,
"period_count_total": 1,
"period_count_remaining": 0,
"relaxation_active": True,
"relaxation_level": "flex=18.0% +level_any",
}
merged_periods, periods_added = resolve_period_overlaps(
existing_periods=[existing_period],
new_relaxed_periods=[relaxed_period],
all_prices=all_prices,
config=config,
time=time,
)
assert periods_added == 1
assert len(merged_periods) == 1
merged = merged_periods[0]
expected_prices = [0.10, 0.10, 0.11, 0.11, 0.12, 0.13, 0.13, 0.14]
expected_cv = calculate_coefficient_of_variation(expected_prices)
assert expected_cv is not None
assert merged["start"] == base_time
assert merged["end"] == base_time + timedelta(minutes=120)
assert merged["period_interval_count"] == 8
assert merged["duration_minutes"] == 120
assert merged["price_mean"] == 0.1175
assert merged["price_median"] == 0.115
assert merged["price_min"] == 0.10
assert merged["price_max"] == 0.14
assert merged["price_spread"] == 0.04
assert merged["price_coefficient_variation_%"] == round(expected_cv, 1)
assert merged["level"] == "normal"
assert merged["rating_level"] == "normal"
assert merged["rating_difference_%"] == -5.25
assert merged["period_price_diff_from_daily_min"] == 0.0175
assert merged["period_price_diff_from_daily_min_%"] == 17.5
assert merged["relaxation_active"] is True
assert merged["relaxation_level"] == "flex=18.0% +level_any"
assert "merged_from" in merged

View file

@ -0,0 +1,94 @@
"""Regression tests for period summary day statistics."""
from __future__ import annotations
from datetime import datetime
import pytest
from custom_components.tibber_prices.coordinator.period_handlers.period_statistics import build_period_summary_dict
from custom_components.tibber_prices.coordinator.period_handlers.types import (
TibberPricesPeriodData,
TibberPricesPeriodStatistics,
)
def _build_stats() -> TibberPricesPeriodStatistics:
"""Create minimal summary stats for period-summary tests."""
return TibberPricesPeriodStatistics(
aggregated_level="cheap",
aggregated_rating="low",
rating_difference_pct=-10.0,
price_mean=-0.2,
price_median=-0.2,
price_min=-0.3,
price_max=0.1,
price_spread=0.4,
volatility="moderate",
coefficient_of_variation=12.3,
period_price_diff=0.0,
period_price_diff_pct=0.0,
)
def _build_period_data(day: datetime) -> TibberPricesPeriodData:
"""Create minimal period timing data for summary tests."""
return TibberPricesPeriodData(
start_time=day.replace(hour=1),
end_time=day.replace(hour=2),
period_length=4,
period_idx=1,
total_periods=1,
)
@pytest.mark.unit
class TestPeriodSummaryDayVolatility:
"""Validate day_volatility_% semantics on extreme price days."""
def test_day_volatility_uses_absolute_average_for_negative_price_days(self) -> None:
"""Negative-average days should still report meaningful volatility percentage."""
day = datetime(2025, 11, 22)
summary = build_period_summary_dict(
_build_period_data(day),
_build_stats(),
reverse_sort=False,
price_context={
"intervals_by_day": {
day.date(): [
{"total": -0.30},
{"total": -0.10},
{"total": 0.10},
]
},
"avg_prices": {day.date(): -0.10},
},
)
assert summary["day_volatility_%"] == 400.0
assert summary["day_price_min"] == -30.0
assert summary["day_price_max"] == 10.0
assert summary["day_price_span"] == 40.0
def test_day_volatility_is_none_when_day_average_is_zero(self) -> None:
"""Zero-average days should avoid reporting a misleading 0% volatility."""
day = datetime(2025, 11, 23)
summary = build_period_summary_dict(
_build_period_data(day),
_build_stats(),
reverse_sort=False,
price_context={
"intervals_by_day": {
day.date(): [
{"total": -0.20},
{"total": 0.20},
]
},
"avg_prices": {day.date(): 0.0},
},
)
assert summary["day_volatility_%"] is None
assert summary["day_price_min"] == -20.0
assert summary["day_price_max"] == 20.0
assert summary["day_price_span"] == 40.0

233
tests/test_periods_hash.py Normal file
View file

@ -0,0 +1,233 @@
"""Regression tests for period calculation cache hashing."""
from __future__ import annotations
from copy import deepcopy
from typing import Any
from unittest.mock import Mock
import pytest
from custom_components.tibber_prices import const as _const
from custom_components.tibber_prices.coordinator import periods as periods_module
from custom_components.tibber_prices.coordinator.periods import TibberPricesPeriodCalculator
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
from homeassistant.util import dt as dt_util
def _create_hash_interval(starts_at: str, price: float, level: str, rating_level: str, difference: float) -> dict:
"""Create one interval for period hash tests."""
parsed = dt_util.parse_datetime(starts_at)
assert parsed is not None
return {
"startsAt": parsed,
"total": price,
"level": level,
"rating_level": rating_level,
"difference": difference,
}
def _create_hash_price_info() -> list[dict]:
"""Create minimal today/tomorrow data for cache hash tests."""
return [
_create_hash_interval("2025-11-22T00:00:00+01:00", 0.11, "CHEAP", "LOW", -12.0),
_create_hash_interval("2025-11-22T00:15:00+01:00", 0.12, "NORMAL", "NORMAL", -4.0),
_create_hash_interval("2025-11-23T00:00:00+01:00", 0.13, "NORMAL", "NORMAL", 0.0),
_create_hash_interval("2025-11-23T00:15:00+01:00", 0.14, "EXPENSIVE", "HIGH", 12.0),
]
def _compute_hash(calculator: TibberPricesPeriodCalculator, price_info: list[dict]) -> str:
"""Call the internal periods hash helper without tripping the private-access lint rule."""
return calculator._compute_periods_hash(price_info) # noqa: SLF001 - targeted cache-key regression check
def _create_calculator(options: dict[str, Any]) -> TibberPricesPeriodCalculator:
"""Create a period calculator with deterministic test time."""
calculator = TibberPricesPeriodCalculator(Mock(options=options), "[test]")
reference_time = dt_util.parse_datetime("2025-11-22T12:00:00+01:00")
assert reference_time is not None
calculator.time = TibberPricesTimeService(reference_time=reference_time)
return calculator
@pytest.mark.unit
@pytest.mark.freeze_time("2025-11-22 12:00:00+01:00")
class TestPeriodsHash:
"""Validate that same-day value changes invalidate the period cache."""
def test_hash_changes_when_same_day_price_changes(self) -> None:
"""Changing an interval price with identical timestamps must change the cache hash."""
calculator = TibberPricesPeriodCalculator(Mock(options={}), "[test]")
original = _create_hash_price_info()
updated = deepcopy(original)
updated[1]["total"] = 0.125
assert _compute_hash(calculator, original) != _compute_hash(calculator, updated)
def test_hash_changes_when_same_day_level_changes(self) -> None:
"""Changing level/rating metadata with identical timestamps must change the cache hash."""
calculator = TibberPricesPeriodCalculator(Mock(options={}), "[test]")
original = _create_hash_price_info()
updated = deepcopy(original)
updated[1]["level"] = "CHEAP"
updated[1]["rating_level"] = "LOW"
updated[1]["difference"] = -10.0
assert _compute_hash(calculator, original) != _compute_hash(calculator, updated)
@pytest.mark.unit
@pytest.mark.freeze_time("2025-11-22 12:00:00+01:00")
class TestPeriodConfigNormalization:
"""Validate numeric period config values degrade cleanly to defaults."""
def test_get_period_config_falls_back_for_invalid_numeric_options(self) -> None:
"""Malformed config values should fall back to defaults instead of raising."""
calculator = _create_calculator(
{
"flexibility_settings": {
_const.CONF_BEST_PRICE_FLEX: "bad-flex",
_const.CONF_BEST_PRICE_MIN_DISTANCE_FROM_AVG: "bad-distance",
},
"period_settings": {
_const.CONF_BEST_PRICE_MIN_PERIOD_LENGTH: "bad-min-length",
},
"extension_settings": {
_const.CONF_BEST_PRICE_MAX_EXTENSION_INTERVALS: "bad-extension",
_const.CONF_BEST_PRICE_GEOMETRIC_FLEX: "bad-geometric",
_const.CONF_BEST_PRICE_SEGMENT_MIN_PERIODS: "bad-segments",
},
}
)
config = calculator.get_period_config(reverse_sort=False)
assert config["flex"] == abs(_const.DEFAULT_BEST_PRICE_FLEX) / 100
assert config["min_distance_from_avg"] == abs(_const.DEFAULT_BEST_PRICE_MIN_DISTANCE_FROM_AVG)
assert config["min_period_length"] == _const.DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH
assert config["max_extension_intervals"] == _const.DEFAULT_BEST_PRICE_MAX_EXTENSION_INTERVALS
assert config["geometric_extra_flex"] == _const.DEFAULT_BEST_PRICE_GEOMETRIC_FLEX / 100
assert config["segment_min_periods"] == _const.DEFAULT_BEST_PRICE_SEGMENT_MIN_PERIODS
def test_should_show_periods_falls_back_for_invalid_gap_count(self) -> None:
"""Malformed gap_count should not break day-level filter checks."""
calculator = _create_calculator(
{
"period_settings": {
_const.CONF_BEST_PRICE_MAX_LEVEL: "cheap",
_const.CONF_BEST_PRICE_MAX_LEVEL_GAP_COUNT: "bad-gap-count",
}
}
)
assert (
calculator.should_show_periods(
[
_create_hash_interval(
"2025-11-22T00:00:00+01:00", 0.11, level="CHEAP", rating_level="LOW", difference=-12.0
),
_create_hash_interval(
"2025-11-22T00:15:00+01:00", 0.10, level="CHEAP", rating_level="LOW", difference=-14.0
),
_create_hash_interval(
"2025-11-22T00:30:00+01:00", 0.09, level="CHEAP", rating_level="LOW", difference=-16.0
),
_create_hash_interval(
"2025-11-22T00:45:00+01:00", 0.08, level="CHEAP", rating_level="LOW", difference=-18.0
),
],
reverse_sort=False,
)
is True
)
def test_calculate_periods_for_price_info_falls_back_for_invalid_runtime_numbers(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Runtime period calculation should use defaults when numeric overrides are malformed."""
captured_calls: list[dict[str, Any]] = []
def _fake_calculate_periods_with_relaxation(
all_prices: list[dict[str, Any]],
*,
config: Any,
enable_relaxation: bool,
min_periods: int,
max_relaxation_attempts: int,
should_show_callback: Any,
time: Any,
config_entry: Any,
day_patterns_by_date: Any,
) -> dict[str, Any]:
captured_calls.append(
{
"reverse_sort": config.reverse_sort,
"min_periods": min_periods,
"max_relaxation_attempts": max_relaxation_attempts,
"gap_count": config.gap_count,
"threshold_low": config.threshold_low,
"threshold_high": config.threshold_high,
"threshold_volatility_moderate": config.threshold_volatility_moderate,
"threshold_volatility_high": config.threshold_volatility_high,
"threshold_volatility_very_high": config.threshold_volatility_very_high,
}
)
return {
"periods": [],
"intervals": [],
"metadata": {
"total_intervals": len(all_prices),
"total_periods": 0,
"config": {},
"relaxation": {"relaxation_active": enable_relaxation, "relaxation_attempted": enable_relaxation},
},
}
monkeypatch.setattr(
periods_module, "calculate_periods_with_relaxation", _fake_calculate_periods_with_relaxation
)
calculator = _create_calculator(
{
_const.CONF_PRICE_RATING_THRESHOLD_LOW: "bad-threshold-low",
_const.CONF_PRICE_RATING_THRESHOLD_HIGH: "bad-threshold-high",
_const.CONF_VOLATILITY_THRESHOLD_MODERATE: "bad-vol-moderate",
_const.CONF_VOLATILITY_THRESHOLD_HIGH: "bad-vol-high",
_const.CONF_VOLATILITY_THRESHOLD_VERY_HIGH: "bad-vol-very-high",
"relaxation_and_target_periods": {
_const.CONF_ENABLE_MIN_PERIODS_BEST: True,
_const.CONF_ENABLE_MIN_PERIODS_PEAK: True,
_const.CONF_MIN_PERIODS_BEST: "bad-min-best",
_const.CONF_RELAXATION_ATTEMPTS_BEST: "bad-attempts-best",
_const.CONF_MIN_PERIODS_PEAK: "bad-min-peak",
_const.CONF_RELAXATION_ATTEMPTS_PEAK: "bad-attempts-peak",
},
"period_settings": {
_const.CONF_BEST_PRICE_MAX_LEVEL_GAP_COUNT: "bad-best-gap",
_const.CONF_PEAK_PRICE_MAX_LEVEL_GAP_COUNT: "bad-peak-gap",
},
}
)
result = calculator.calculate_periods_for_price_info(_create_hash_price_info())
assert result["best_price"]["metadata"]["total_periods"] == 0
assert result["peak_price"]["metadata"]["total_periods"] == 0
assert len(captured_calls) == 2
best_call = next(call for call in captured_calls if call["reverse_sort"] is False)
peak_call = next(call for call in captured_calls if call["reverse_sort"] is True)
assert best_call["min_periods"] == _const.DEFAULT_MIN_PERIODS_BEST
assert best_call["max_relaxation_attempts"] == _const.DEFAULT_RELAXATION_ATTEMPTS_BEST
assert best_call["gap_count"] == _const.DEFAULT_BEST_PRICE_MAX_LEVEL_GAP_COUNT
assert peak_call["min_periods"] == _const.DEFAULT_MIN_PERIODS_PEAK
assert peak_call["max_relaxation_attempts"] == _const.DEFAULT_RELAXATION_ATTEMPTS_PEAK
assert peak_call["gap_count"] == _const.DEFAULT_PEAK_PRICE_MAX_LEVEL_GAP_COUNT
assert best_call["threshold_low"] == _const.DEFAULT_PRICE_RATING_THRESHOLD_LOW
assert best_call["threshold_high"] == _const.DEFAULT_PRICE_RATING_THRESHOLD_HIGH
assert best_call["threshold_volatility_moderate"] == _const.DEFAULT_VOLATILITY_THRESHOLD_MODERATE
assert best_call["threshold_volatility_high"] == _const.DEFAULT_VOLATILITY_THRESHOLD_HIGH
assert best_call["threshold_volatility_very_high"] == _const.DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH

View file

@ -541,6 +541,9 @@ class TestPriceComparison:
cheap_stats = calculate_window_statistics(cheapest["intervals"])
expensive_stats = calculate_window_statistics(most_expensive["intervals"])
assert cheap_stats["price_mean"] is not None
assert expensive_stats["price_mean"] is not None
spread_cheap_to_exp = expensive_stats["price_mean"] - cheap_stats["price_mean"]
spread_exp_to_cheap = cheap_stats["price_mean"] - expensive_stats["price_mean"]
@ -582,6 +585,9 @@ class TestPriceComparison:
cheap_stats = calculate_window_statistics(cheapest["intervals"])
expensive_stats = calculate_window_statistics(most_expensive["intervals"])
assert cheap_stats["price_mean"] is not None
assert expensive_stats["price_mean"] is not None
spread = expensive_stats["price_mean"] - cheap_stats["price_mean"]
assert abs(spread) < 0.0001
@ -598,5 +604,8 @@ class TestPriceComparison:
cheap_stats = calculate_window_statistics(cheapest["intervals"])
expensive_stats = calculate_window_statistics(most_expensive["intervals"])
assert cheap_stats["price_mean"] is not None
assert expensive_stats["price_mean"] is not None
spread = expensive_stats["price_mean"] - cheap_stats["price_mean"]
assert abs(spread) < 0.0001

View file

@ -280,6 +280,7 @@ def test_hysteresis_sequence_simulation() -> None:
rating = calculate_rating_level(
diff, threshold_low, threshold_high, previous_rating=previous, hysteresis=hysteresis
)
assert rating is not None
results_with.append(rating)
previous = rating
assert results_with == expected_with_hysteresis

View file

@ -202,7 +202,7 @@ class TestConfigEntryCleanup:
coordinator._listener_manager = object.__new__(TibberPricesListenerManager) # noqa: SLF001
coordinator._data_transformer = object.__new__(TibberPricesDataTransformer) # noqa: SLF001
coordinator._period_calculator = object.__new__(TibberPricesPeriodCalculator) # noqa: SLF001
coordinator._lifecycle_callbacks = [] # noqa: SLF001
setattr(coordinator, "_lifecycle_callbacks", [])
# Manually call the registration that happens in __init__
# This tests the pattern: entry.async_on_unload(entry.add_update_listener(...))
@ -249,7 +249,7 @@ class TestCacheInvalidation:
# Create calculator with cached data
calculator = object.__new__(TibberPricesPeriodCalculator)
calculator._config_cache = {"some": "data"} # noqa: SLF001
calculator._config_cache = {"best": {"some": "data"}} # noqa: SLF001
calculator._config_cache_valid = True # noqa: SLF001
calculator._cached_periods = {"cached": "periods"} # noqa: SLF001
calculator._last_periods_hash = "some_hash" # noqa: SLF001