fix(rating): improve gap tolerance to find dominant large blocks

The gap tolerance algorithm now looks through small intermediate blocks
to find the first LARGE block (> gap_tolerance) in each direction.
This ensures small isolated rating intervals are merged into the
correct dominant block, not just the nearest neighbor.

Example: NORMAL(large) HIGH(1) NORMAL(1) HIGH(large)
Before: HIGH at 05:45 merged into NORMAL (wrong - nearest neighbor)
After:  NORMAL at 06:00 merged into HIGH (correct - dominant block)

Also collects all merge decisions BEFORE applying them, preventing
order-dependent outcomes when multiple small blocks are adjacent.

Impact: Rating transitions now appear at visually logical positions
where prices actually change direction, not at arbitrary boundaries.
This commit is contained in:
Julian Pawlowski 2025-12-22 13:28:25 +00:00
parent ba032a1c94
commit 64cf842719
3 changed files with 665 additions and 25 deletions

View file

@ -92,6 +92,8 @@ DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH = 60 # 60 minutes minimum period length fo
DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH = 30 # 30 minutes minimum period length for peak price (user-facing, minutes)
DEFAULT_PRICE_RATING_THRESHOLD_LOW = -10 # Default rating threshold low percentage
DEFAULT_PRICE_RATING_THRESHOLD_HIGH = 10 # Default rating threshold high percentage
DEFAULT_PRICE_RATING_HYSTERESIS = 2.0 # Hysteresis percentage to prevent flickering at threshold boundaries
DEFAULT_PRICE_RATING_GAP_TOLERANCE = 1 # Max consecutive intervals to smooth out (0 = disabled)
DEFAULT_AVERAGE_SENSOR_DISPLAY = "median" # Default: show median in state, mean in attributes
DEFAULT_PRICE_TREND_THRESHOLD_RISING = 3 # Default trend threshold for rising prices (%)
DEFAULT_PRICE_TREND_THRESHOLD_FALLING = -3 # Default trend threshold for falling prices (%, negative value)

View file

@ -11,6 +11,8 @@ if TYPE_CHECKING:
from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService
from custom_components.tibber_prices.const import (
DEFAULT_PRICE_RATING_GAP_TOLERANCE,
DEFAULT_PRICE_RATING_HYSTERESIS,
DEFAULT_VOLATILITY_THRESHOLD_HIGH,
DEFAULT_VOLATILITY_THRESHOLD_MODERATE,
DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH,
@ -186,27 +188,41 @@ def calculate_difference_percentage(
return ((current_interval_price - trailing_average) / abs(trailing_average)) * 100
def calculate_rating_level(
def calculate_rating_level( # noqa: PLR0911 - Multiple returns justified by clear hysteresis state machine
difference: float | None,
threshold_low: float,
threshold_high: float,
*,
previous_rating: str | None = None,
hysteresis: float = 0.0,
) -> str | None:
"""
Calculate the rating level based on difference percentage and thresholds.
This mimics the API's "level" field from priceRating endpoint.
Supports hysteresis to prevent flickering at threshold boundaries. When a previous
rating is provided, the threshold for leaving that state is adjusted by the
hysteresis value, requiring a more significant change to switch states.
Args:
difference: The difference percentage (from calculate_difference_percentage)
threshold_low: The low threshold percentage (typically -100 to 0)
threshold_high: The high threshold percentage (typically 0 to 100)
previous_rating: The rating level of the previous interval (for hysteresis)
hysteresis: The hysteresis percentage (default 0.0 = no hysteresis)
Returns:
"LOW" if difference <= threshold_low
"HIGH" if difference >= threshold_high
"LOW" if difference <= threshold_low (adjusted by hysteresis)
"HIGH" if difference >= threshold_high (adjusted by hysteresis)
"NORMAL" otherwise
None if difference is None
Example with hysteresis=2.0 and threshold_low=-10:
- To enter LOW from NORMAL: difference must be <= -10% (threshold_low)
- To leave LOW back to NORMAL: difference must be > -8% (threshold_low + hysteresis)
This creates a "dead zone" that prevents rapid switching at boundaries.
"""
if difference is None:
return None
@ -222,7 +238,29 @@ def calculate_rating_level(
)
return PRICE_RATING_NORMAL
# Classify based on thresholds
# Apply hysteresis based on previous state
# The idea: make it "harder" to leave the current state than to enter it
if previous_rating == "LOW":
# Currently LOW: need to exceed threshold_low + hysteresis to leave
exit_threshold_low = threshold_low + hysteresis
if difference <= exit_threshold_low:
return "LOW"
# Check if we should go to HIGH (rare, but possible with large price swings)
if difference >= threshold_high:
return "HIGH"
return PRICE_RATING_NORMAL
if previous_rating == "HIGH":
# Currently HIGH: need to drop below threshold_high - hysteresis to leave
exit_threshold_high = threshold_high - hysteresis
if difference >= exit_threshold_high:
return "HIGH"
# Check if we should go to LOW (rare, but possible with large price swings)
if difference <= threshold_low:
return "LOW"
return PRICE_RATING_NORMAL
# No previous state or previous was NORMAL: use standard thresholds
if difference <= threshold_low:
return "LOW"
@ -232,12 +270,15 @@ def calculate_rating_level(
return PRICE_RATING_NORMAL
def _process_price_interval(
def _process_price_interval( # noqa: PLR0913 - Extra params needed for hysteresis
price_interval: dict[str, Any],
all_prices: list[dict[str, Any]],
threshold_low: float,
threshold_high: float,
) -> None:
*,
previous_rating: str | None = None,
hysteresis: float = 0.0,
) -> str | None:
"""
Process a single price interval and add difference and rating_level.
@ -246,16 +287,20 @@ def _process_price_interval(
all_prices: All available price intervals for lookback calculation
threshold_low: Low threshold percentage
threshold_high: High threshold percentage
day_label: Label for logging ("today" or "tomorrow")
previous_rating: The rating level of the previous interval (for hysteresis)
hysteresis: The hysteresis percentage to prevent flickering
Returns:
The calculated rating_level (for use as previous_rating in next call)
"""
starts_at = price_interval.get("startsAt") # Already datetime object in local timezone
if not starts_at:
return
return previous_rating
current_interval_price = price_interval.get("total")
if current_interval_price is None:
return
return previous_rating
# Calculate trailing average
trailing_avg = calculate_trailing_average_for_interval(starts_at, all_prices)
@ -265,20 +310,238 @@ def _process_price_interval(
difference = calculate_difference_percentage(float(current_interval_price), trailing_avg)
price_interval["difference"] = difference
# Calculate rating_level based on difference
rating_level = calculate_rating_level(difference, threshold_low, threshold_high)
# Calculate rating_level based on difference with hysteresis
rating_level = calculate_rating_level(
difference,
threshold_low,
threshold_high,
previous_rating=previous_rating,
hysteresis=hysteresis,
)
price_interval["rating_level"] = rating_level
else:
# Set to None if we couldn't calculate (expected for intervals in first 24h)
price_interval["difference"] = None
price_interval["rating_level"] = None
return rating_level
# Set to None if we couldn't calculate (expected for intervals in first 24h)
price_interval["difference"] = None
price_interval["rating_level"] = None
return None
def enrich_price_info_with_differences(
def _build_rating_blocks(
rated_intervals: list[tuple[int, dict[str, Any], str]],
) -> list[tuple[int, int, str, int]]:
"""
Build list of contiguous rating blocks from rated intervals.
Args:
rated_intervals: List of (original_idx, interval_dict, rating) tuples
Returns:
List of (start_idx, end_idx, rating, length) tuples where indices
refer to positions in rated_intervals
"""
blocks: list[tuple[int, int, str, int]] = []
if not rated_intervals:
return blocks
block_start = 0
current_rating = rated_intervals[0][2]
for idx in range(1, len(rated_intervals)):
if rated_intervals[idx][2] != current_rating:
# End current block
blocks.append((block_start, idx - 1, current_rating, idx - block_start))
block_start = idx
current_rating = rated_intervals[idx][2]
# Don't forget the last block
blocks.append((block_start, len(rated_intervals) - 1, current_rating, len(rated_intervals) - block_start))
return blocks
def _calculate_gravitational_pull(
blocks: list[tuple[int, int, str, int]],
block_idx: int,
direction: str,
gap_tolerance: int,
) -> tuple[int, str]:
"""
Calculate "gravitational pull" from neighboring blocks in one direction.
This finds the first LARGE block (> gap_tolerance) in the given direction
and returns its size and rating. Small intervening blocks are "looked through".
This approach ensures that small isolated blocks are always pulled toward
the dominant large block, even if there are other small blocks in between.
Args:
blocks: List of (start_idx, end_idx, rating, length) tuples
block_idx: Index of the current block being evaluated
direction: "left" or "right"
gap_tolerance: Maximum size of blocks considered "small"
Returns:
Tuple of (size, rating) of the first large block found,
or (immediate_neighbor_size, immediate_neighbor_rating) if no large block exists
"""
probe_range = range(block_idx - 1, -1, -1) if direction == "left" else range(block_idx + 1, len(blocks))
total_small_accumulated = 0
for probe_idx in probe_range:
probe_rating = blocks[probe_idx][2]
probe_size = blocks[probe_idx][3]
if probe_size > gap_tolerance:
# Found a large block - return its characteristics
# Add any accumulated small blocks of the same rating
if total_small_accumulated > 0:
return (probe_size + total_small_accumulated, probe_rating)
return (probe_size, probe_rating)
# Small block - accumulate if same rating as what we've seen
total_small_accumulated += probe_size
# No large block found - return the immediate neighbor's info
neighbor_idx = block_idx - 1 if direction == "left" else block_idx + 1
return (blocks[neighbor_idx][3], blocks[neighbor_idx][2])
def _apply_rating_gap_tolerance(
all_intervals: list[dict[str, Any]],
gap_tolerance: int,
) -> None:
"""
Apply gap tolerance to smooth out isolated rating level changes.
This is a post-processing step after hysteresis. It identifies short sequences
of intervals ( gap_tolerance) and merges them into the larger neighboring block.
The algorithm is bidirectional - it compares block sizes on both sides and
assigns the small block to whichever neighbor is larger.
This matches human intuition: a single "different" interval feels like it
should belong to the larger surrounding group.
Example with gap_tolerance=1:
LOW LOW LOW NORMAL LOW LOW LOW LOW LOW LOW LOW LOW
(single NORMAL gets merged into larger LOW block)
Example with gap_tolerance=1 (bidirectional):
NORMAL NORMAL HIGH NORMAL HIGH HIGH HIGH NORMAL NORMAL HIGH HIGH HIGH HIGH HIGH
(single NORMAL at position 4 gets merged into larger HIGH block on the right)
Args:
all_intervals: List of price intervals with rating_level already set (modified in-place)
gap_tolerance: Maximum number of consecutive "different" intervals to smooth out
Note:
- Compares block sizes on both sides and merges small blocks into larger neighbors
- If both neighbors have equal size, prefers the LEFT neighbor (earlier in time)
- Skips intervals without rating_level (None)
- Intervals must be sorted chronologically for this to work correctly
- Multiple passes may be needed as merging can create new small blocks
"""
if gap_tolerance <= 0:
return
# Extract intervals with valid rating_level in chronological order
rated_intervals: list[tuple[int, dict[str, Any], str]] = [
(i, interval, interval["rating_level"])
for i, interval in enumerate(all_intervals)
if interval.get("rating_level") is not None
]
if len(rated_intervals) < 3: # noqa: PLR2004 - Minimum 3 for before/gap/after pattern
return
# Iteratively merge small blocks until no more changes
max_iterations = 10
total_corrections = 0
for iteration in range(max_iterations):
blocks = _build_rating_blocks(rated_intervals)
corrections_this_pass = _merge_small_blocks(blocks, rated_intervals, gap_tolerance)
total_corrections += corrections_this_pass
if corrections_this_pass == 0:
break
_LOGGER.debug(
"Gap tolerance pass %d: merged %d small blocks",
iteration + 1,
corrections_this_pass,
)
if total_corrections > 0:
_LOGGER.debug("Gap tolerance: total %d block merges across all passes", total_corrections)
def _merge_small_blocks(
blocks: list[tuple[int, int, str, int]],
rated_intervals: list[tuple[int, dict[str, Any], str]],
gap_tolerance: int,
) -> int:
"""
Merge small blocks into their larger neighbors.
CRITICAL: This function collects ALL merge decisions FIRST, then applies them.
This prevents the order of processing from affecting outcomes. Without this,
earlier blocks could be merged incorrectly because the gravitational pull
calculation would see already-modified neighbors instead of the original state.
The merge decision is based on the FIRST LARGE BLOCK in each direction,
looking through any small intervening blocks. This ensures consistent
behavior when multiple small blocks are adjacent.
Args:
blocks: List of (start_idx, end_idx, rating, length) tuples
rated_intervals: List of (original_idx, interval_dict, rating) tuples (modified in-place)
gap_tolerance: Maximum size of blocks to merge
Returns:
Number of blocks merged in this pass
"""
# Phase 1: Collect all merge decisions based on ORIGINAL block state
merge_decisions: list[tuple[int, int, str]] = [] # (start_ri_idx, end_ri_idx, target_rating)
for block_idx, (start, end, rating, length) in enumerate(blocks):
if length > gap_tolerance:
continue
# Must have neighbors on BOTH sides (not an edge block)
if block_idx == 0 or block_idx == len(blocks) - 1:
continue
# Calculate gravitational pull from each direction
left_pull, left_rating = _calculate_gravitational_pull(blocks, block_idx, "left", gap_tolerance)
right_pull, right_rating = _calculate_gravitational_pull(blocks, block_idx, "right", gap_tolerance)
# Determine target rating (prefer left if equal)
target_rating = left_rating if left_pull >= right_pull else right_rating
if rating != target_rating:
merge_decisions.append((start, end, target_rating))
# Phase 2: Apply all merge decisions
for start, end, target_rating in merge_decisions:
for ri_idx in range(start, end + 1):
original_idx, interval, _old_rating = rated_intervals[ri_idx]
interval["rating_level"] = target_rating
rated_intervals[ri_idx] = (original_idx, interval, target_rating)
return len(merge_decisions)
def enrich_price_info_with_differences( # noqa: PLR0913 - Extra params for rating stabilization
all_intervals: list[dict[str, Any]],
*,
threshold_low: float | None = None,
threshold_high: float | None = None,
hysteresis: float | None = None,
gap_tolerance: int | None = None,
time: TibberPricesTimeService | None = None, # noqa: ARG001 # Used in production (via coordinator), kept for compatibility
) -> list[dict[str, Any]]:
"""
@ -287,15 +550,29 @@ def enrich_price_info_with_differences(
Computes the trailing 24-hour average, difference percentage, and rating level
for intervals that have sufficient lookback data (in-place modification).
Uses hysteresis to prevent flickering at threshold boundaries. When an interval's
difference is near a threshold, hysteresis ensures that the rating only changes
when there's a significant movement, not just minor fluctuations.
After hysteresis, applies gap tolerance as post-processing to smooth out any
remaining isolated rating changes (e.g., a single NORMAL interval surrounded
by LOW intervals gets corrected to LOW).
CRITICAL: Only enriches intervals that have at least 24 hours of prior data
available. This is determined by checking if (interval_start - earliest_interval_start) >= 24h.
Works independently of interval density (24 vs 96 intervals/day) and handles
transition periods (e.g., Oct 1, 2025) correctly.
CRITICAL: Intervals are processed in chronological order to properly apply
hysteresis. The rating_level of each interval depends on the previous interval's
rating to prevent rapid switching at threshold boundaries.
Args:
all_intervals: Flat list of all price intervals (day_before_yesterday + yesterday + today + tomorrow).
threshold_low: Low threshold percentage for rating_level (defaults to -10)
threshold_high: High threshold percentage for rating_level (defaults to 10)
hysteresis: Hysteresis percentage to prevent flickering (defaults to 2.0)
gap_tolerance: Max consecutive intervals to smooth out (defaults to 1, 0 = disabled)
time: TibberPricesTimeService instance (kept for API compatibility, not used)
Returns:
@ -311,6 +588,8 @@ def enrich_price_info_with_differences(
"""
threshold_low = threshold_low if threshold_low is not None else -10
threshold_high = threshold_high if threshold_high is not None else 10
hysteresis = hysteresis if hysteresis is not None else DEFAULT_PRICE_RATING_HYSTERESIS
gap_tolerance = gap_tolerance if gap_tolerance is not None else DEFAULT_PRICE_RATING_GAP_TOLERANCE
if not all_intervals:
return all_intervals
@ -330,25 +609,42 @@ def enrich_price_info_with_differences(
# Only intervals starting at or after this boundary have full 24h lookback
enrichment_boundary = earliest_start + timedelta(hours=24)
# Process intervals (modifies in-place)
# CRITICAL: Sort intervals by time for proper hysteresis application
# We need to process intervals in chronological order so each interval
# can use the previous interval's rating_level for hysteresis
intervals_with_time: list[tuple[dict[str, Any], datetime]] = [
(interval, starts_at) for interval in all_intervals if (starts_at := interval.get("startsAt")) is not None
]
intervals_with_time.sort(key=lambda x: x[1])
# Process intervals in chronological order (modifies in-place)
# CRITICAL: Only enrich intervals that start >= 24h after earliest data
enriched_count = 0
skipped_count = 0
previous_rating: str | None = None
for price_interval in all_intervals:
starts_at = price_interval.get("startsAt")
if not starts_at:
skipped_count += 1
continue
for price_interval, starts_at in intervals_with_time:
# Skip if interval doesn't have full 24h lookback
if starts_at < enrichment_boundary:
skipped_count += 1
continue
_process_price_interval(price_interval, all_intervals, threshold_low, threshold_high)
# Process interval and get its rating for use as previous_rating in next iteration
previous_rating = _process_price_interval(
price_interval,
all_intervals,
threshold_low,
threshold_high,
previous_rating=previous_rating,
hysteresis=hysteresis,
)
enriched_count += 1
# Apply gap tolerance as post-processing step
# This smooths out isolated rating changes that slip through hysteresis
if gap_tolerance > 0:
_apply_rating_gap_tolerance(all_intervals, gap_tolerance)
return all_intervals

View file

@ -5,7 +5,10 @@ import logging
import pytest
from _pytest.logging import LogCaptureFixture
from custom_components.tibber_prices.utils.price import calculate_rating_level
from custom_components.tibber_prices.utils.price import (
_apply_rating_gap_tolerance,
calculate_rating_level,
)
@pytest.fixture
@ -142,3 +145,342 @@ def test_rating_level_asymmetric_thresholds() -> None:
assert calculate_rating_level(-15.0, threshold_low, threshold_high) == "NORMAL"
assert calculate_rating_level(0.0, threshold_low, threshold_high) == "NORMAL"
assert calculate_rating_level(6.0, threshold_low, threshold_high) == "HIGH"
# =============================================================================
# Hysteresis Tests - Prevent flickering at threshold boundaries
# =============================================================================
def test_hysteresis_prevents_flickering_at_low_threshold() -> None:
"""Test that hysteresis prevents rapid switching at LOW threshold boundary."""
threshold_low = -10.0
threshold_high = 10.0
hysteresis = 2.0
# Without previous state: enters LOW at threshold
assert calculate_rating_level(-10.0, threshold_low, threshold_high) == "LOW"
# With previous state LOW: stays LOW until exceeds exit threshold (-10 + 2 = -8)
assert (
calculate_rating_level(-9.5, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis)
== "LOW"
)
assert (
calculate_rating_level(-8.5, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis)
== "LOW"
)
# Exits LOW when exceeding exit threshold
assert (
calculate_rating_level(-7.5, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis)
== "NORMAL"
)
# With previous state NORMAL: enters LOW at standard threshold
assert (
calculate_rating_level(-10.0, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis)
== "LOW"
)
assert (
calculate_rating_level(-9.5, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis)
== "NORMAL"
)
def test_hysteresis_prevents_flickering_at_high_threshold() -> None:
"""Test that hysteresis prevents rapid switching at HIGH threshold boundary."""
threshold_low = -10.0
threshold_high = 10.0
hysteresis = 2.0
# Without previous state: enters HIGH at threshold
assert calculate_rating_level(10.0, threshold_low, threshold_high) == "HIGH"
# With previous state HIGH: stays HIGH until drops below exit threshold (10 - 2 = 8)
assert (
calculate_rating_level(9.5, threshold_low, threshold_high, previous_rating="HIGH", hysteresis=hysteresis)
== "HIGH"
)
assert (
calculate_rating_level(8.5, threshold_low, threshold_high, previous_rating="HIGH", hysteresis=hysteresis)
== "HIGH"
)
# Exits HIGH when dropping below exit threshold
assert (
calculate_rating_level(7.5, threshold_low, threshold_high, previous_rating="HIGH", hysteresis=hysteresis)
== "NORMAL"
)
# With previous state NORMAL: enters HIGH at standard threshold
assert (
calculate_rating_level(10.0, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis)
== "HIGH"
)
assert (
calculate_rating_level(9.5, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis)
== "NORMAL"
)
def test_hysteresis_allows_direct_transition_low_to_high() -> None:
"""Test that extreme price swings can jump directly from LOW to HIGH."""
threshold_low = -10.0
threshold_high = 10.0
hysteresis = 2.0
# Even when in LOW state, a very high price should transition to HIGH
assert (
calculate_rating_level(15.0, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis)
== "HIGH"
)
# And vice versa
assert (
calculate_rating_level(-15.0, threshold_low, threshold_high, previous_rating="HIGH", hysteresis=hysteresis)
== "LOW"
)
def test_hysteresis_with_zero_value() -> None:
"""Test that zero hysteresis behaves like original function."""
threshold_low = -10.0
threshold_high = 10.0
hysteresis = 0.0
# With zero hysteresis, should behave exactly like original function
assert (
calculate_rating_level(-10.0, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis)
== "LOW"
)
assert (
calculate_rating_level(-9.9, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis)
== "NORMAL"
)
def test_hysteresis_sequence_simulation() -> None:
"""Simulate a sequence of price changes to verify hysteresis prevents flickering."""
threshold_low = -10.0
threshold_high = 10.0
hysteresis = 2.0
# Simulate price differences oscillating around -10% threshold
price_differences = [-9.5, -10.2, -9.8, -10.1, -9.9, -10.3, -8.5, -9.0, -7.5]
expected_without_hysteresis = ["NORMAL", "LOW", "NORMAL", "LOW", "NORMAL", "LOW", "NORMAL", "NORMAL", "NORMAL"]
expected_with_hysteresis = ["NORMAL", "LOW", "LOW", "LOW", "LOW", "LOW", "LOW", "LOW", "NORMAL"]
# Without hysteresis: lots of flickering
results_without = [calculate_rating_level(diff, threshold_low, threshold_high) for diff in price_differences]
assert results_without == expected_without_hysteresis
# With hysteresis: stable blocks
results_with: list[str] = []
previous: str | None = None
for diff in price_differences:
rating = calculate_rating_level(
diff, threshold_low, threshold_high, previous_rating=previous, hysteresis=hysteresis
)
results_with.append(rating)
previous = rating
assert results_with == expected_with_hysteresis
# =============================================================================
# Gap Tolerance Tests - Smooth out isolated rating changes
# =============================================================================
def test_gap_tolerance_single_interval() -> None:
"""Test that a single isolated interval gets smoothed out."""
# Create intervals with a single NORMAL surrounded by LOW
intervals = [
{"rating_level": "LOW"},
{"rating_level": "LOW"},
{"rating_level": "NORMAL"}, # Isolated - should be corrected
{"rating_level": "LOW"},
{"rating_level": "LOW"},
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "LOW", "LOW"]
def test_gap_tolerance_two_intervals() -> None:
"""Test that two consecutive isolated intervals get smoothed out with gap_tolerance=2."""
# Two NORMALs surrounded by LOW
intervals = [
{"rating_level": "LOW"},
{"rating_level": "NORMAL"},
{"rating_level": "NORMAL"},
{"rating_level": "LOW"},
]
# With gap_tolerance=1, should NOT be corrected (2 > 1)
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
assert [i["rating_level"] for i in intervals] == ["LOW", "NORMAL", "NORMAL", "LOW"]
# With gap_tolerance=2, SHOULD be corrected
_apply_rating_gap_tolerance(intervals, gap_tolerance=2)
assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "LOW"]
def test_gap_tolerance_different_surrounding_ratings() -> None:
"""Test that gaps between different ratings are merged into the larger neighbor."""
# NORMAL surrounded by LOW (2) on left, HIGH (2) on right
# Both have equal size, so it should merge into LEFT (earlier in time)
intervals = [
{"rating_level": "LOW"},
{"rating_level": "LOW"},
{"rating_level": "NORMAL"},
{"rating_level": "HIGH"},
{"rating_level": "HIGH"},
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
# With equal neighbors, prefer LEFT (LOW) - single NORMAL merges into LOW
assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "HIGH", "HIGH"]
def test_gap_tolerance_bidirectional_larger_right() -> None:
"""Test that gaps merge into the larger neighboring block (right side)."""
# NORMAL surrounded by LOW (2) on left, HIGH (3) on right
# RIGHT is larger, so NORMAL should merge into HIGH
intervals = [
{"rating_level": "LOW"},
{"rating_level": "LOW"},
{"rating_level": "NORMAL"}, # Should merge into HIGH (larger neighbor)
{"rating_level": "HIGH"},
{"rating_level": "HIGH"},
{"rating_level": "HIGH"},
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
# NORMAL merges into HIGH because HIGH block is larger
assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "HIGH", "HIGH", "HIGH", "HIGH"]
def test_gap_tolerance_bidirectional_larger_left() -> None:
"""Test that gaps merge into the larger neighboring block (left side)."""
# NORMAL surrounded by LOW (3) on left, HIGH (2) on right
# LEFT is larger, so NORMAL should merge into LOW
intervals = [
{"rating_level": "LOW"},
{"rating_level": "LOW"},
{"rating_level": "LOW"},
{"rating_level": "NORMAL"}, # Should merge into LOW (larger neighbor)
{"rating_level": "HIGH"},
{"rating_level": "HIGH"},
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
# NORMAL merges into LOW because LOW block is larger
assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "LOW", "HIGH", "HIGH"]
def test_gap_tolerance_chain_merge() -> None:
"""
Test the real-world scenario: NORMAL HIGH NORMAL HIGH HIGH HIGH.
This is the 05:30-06:45 scenario where:
- 05:45 is HIGH (single, diff=+14%)
- 06:00 is NORMAL (single, diff=+3.2%)
- 06:15+ is HIGH (large block, diff>+12%)
The algorithm looks for the first LARGE block in each direction,
not just the immediate neighbor. This ensures small blocks are
pulled toward the dominant large block.
"""
intervals = [
{"rating_level": "NORMAL"}, # 05:30
{"rating_level": "HIGH"}, # 05:45 - single HIGH
{"rating_level": "NORMAL"}, # 06:00 - single NORMAL
{"rating_level": "HIGH"}, # 06:15
{"rating_level": "HIGH"}, # 06:30
{"rating_level": "HIGH"}, # 06:45
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
# With the "look through small blocks" logic:
# - HIGH(1) at idx 1: left=NORMAL(1) small, looks further left=nothing → left_pull=(1, NORMAL)
# right=NORMAL(1) small, looks further right=HIGH(3) → right_pull=(3, HIGH)
# Since it's already HIGH and right pull is HIGH, no change needed
# - NORMAL(1) at idx 2: left=HIGH(1) small, looks further left=NORMAL(1) small → left_pull=(1, NORMAL)
# right=HIGH(3) large → right_pull=(3, HIGH)
# Merges into HIGH (larger pull)
# Result: NORMAL(1), HIGH(5)
assert [i["rating_level"] for i in intervals] == [
"NORMAL",
"HIGH",
"HIGH",
"HIGH",
"HIGH",
"HIGH",
]
def test_gap_tolerance_multiple_gaps() -> None:
"""Test that multiple gaps in a sequence get corrected."""
intervals = [
{"rating_level": "LOW"},
{"rating_level": "NORMAL"}, # Gap 1
{"rating_level": "LOW"},
{"rating_level": "LOW"},
{"rating_level": "NORMAL"}, # Gap 2
{"rating_level": "LOW"},
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "LOW", "LOW", "LOW"]
def test_gap_tolerance_disabled() -> None:
"""Test that gap_tolerance=0 disables the feature."""
intervals = [
{"rating_level": "LOW"},
{"rating_level": "NORMAL"},
{"rating_level": "LOW"},
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=0)
# Should remain unchanged
assert [i["rating_level"] for i in intervals] == ["LOW", "NORMAL", "LOW"]
def test_gap_tolerance_with_none_ratings() -> None:
"""Test that None ratings are skipped correctly."""
intervals = [
{"rating_level": None}, # Skipped
{"rating_level": "LOW"},
{"rating_level": "NORMAL"},
{"rating_level": "LOW"},
{"rating_level": None}, # Skipped
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
assert intervals[0]["rating_level"] is None
assert intervals[1]["rating_level"] == "LOW"
assert intervals[2]["rating_level"] == "LOW" # Corrected
assert intervals[3]["rating_level"] == "LOW"
assert intervals[4]["rating_level"] is None
def test_gap_tolerance_high_rating_gaps() -> None:
"""Test gap tolerance for HIGH ratings."""
intervals = [
{"rating_level": "HIGH"},
{"rating_level": "NORMAL"}, # Isolated
{"rating_level": "HIGH"},
{"rating_level": "HIGH"},
]
_apply_rating_gap_tolerance(intervals, gap_tolerance=1)
assert [i["rating_level"] for i in intervals] == ["HIGH", "HIGH", "HIGH", "HIGH"]