diff --git a/custom_components/tibber_prices/const.py b/custom_components/tibber_prices/const.py index 57073e7..019f0c7 100644 --- a/custom_components/tibber_prices/const.py +++ b/custom_components/tibber_prices/const.py @@ -92,6 +92,8 @@ DEFAULT_BEST_PRICE_MIN_PERIOD_LENGTH = 60 # 60 minutes minimum period length fo DEFAULT_PEAK_PRICE_MIN_PERIOD_LENGTH = 30 # 30 minutes minimum period length for peak price (user-facing, minutes) DEFAULT_PRICE_RATING_THRESHOLD_LOW = -10 # Default rating threshold low percentage DEFAULT_PRICE_RATING_THRESHOLD_HIGH = 10 # Default rating threshold high percentage +DEFAULT_PRICE_RATING_HYSTERESIS = 2.0 # Hysteresis percentage to prevent flickering at threshold boundaries +DEFAULT_PRICE_RATING_GAP_TOLERANCE = 1 # Max consecutive intervals to smooth out (0 = disabled) DEFAULT_AVERAGE_SENSOR_DISPLAY = "median" # Default: show median in state, mean in attributes DEFAULT_PRICE_TREND_THRESHOLD_RISING = 3 # Default trend threshold for rising prices (%) DEFAULT_PRICE_TREND_THRESHOLD_FALLING = -3 # Default trend threshold for falling prices (%, negative value) diff --git a/custom_components/tibber_prices/utils/price.py b/custom_components/tibber_prices/utils/price.py index 0876967..5bf2fd8 100644 --- a/custom_components/tibber_prices/utils/price.py +++ b/custom_components/tibber_prices/utils/price.py @@ -11,6 +11,8 @@ if TYPE_CHECKING: from custom_components.tibber_prices.coordinator.time_service import TibberPricesTimeService from custom_components.tibber_prices.const import ( + DEFAULT_PRICE_RATING_GAP_TOLERANCE, + DEFAULT_PRICE_RATING_HYSTERESIS, DEFAULT_VOLATILITY_THRESHOLD_HIGH, DEFAULT_VOLATILITY_THRESHOLD_MODERATE, DEFAULT_VOLATILITY_THRESHOLD_VERY_HIGH, @@ -186,27 +188,41 @@ def calculate_difference_percentage( return ((current_interval_price - trailing_average) / abs(trailing_average)) * 100 -def calculate_rating_level( +def calculate_rating_level( # noqa: PLR0911 - Multiple returns justified by clear hysteresis state machine difference: float | None, threshold_low: float, threshold_high: float, + *, + previous_rating: str | None = None, + hysteresis: float = 0.0, ) -> str | None: """ Calculate the rating level based on difference percentage and thresholds. This mimics the API's "level" field from priceRating endpoint. + Supports hysteresis to prevent flickering at threshold boundaries. When a previous + rating is provided, the threshold for leaving that state is adjusted by the + hysteresis value, requiring a more significant change to switch states. + Args: difference: The difference percentage (from calculate_difference_percentage) threshold_low: The low threshold percentage (typically -100 to 0) threshold_high: The high threshold percentage (typically 0 to 100) + previous_rating: The rating level of the previous interval (for hysteresis) + hysteresis: The hysteresis percentage (default 0.0 = no hysteresis) Returns: - "LOW" if difference <= threshold_low - "HIGH" if difference >= threshold_high + "LOW" if difference <= threshold_low (adjusted by hysteresis) + "HIGH" if difference >= threshold_high (adjusted by hysteresis) "NORMAL" otherwise None if difference is None + Example with hysteresis=2.0 and threshold_low=-10: + - To enter LOW from NORMAL: difference must be <= -10% (threshold_low) + - To leave LOW back to NORMAL: difference must be > -8% (threshold_low + hysteresis) + This creates a "dead zone" that prevents rapid switching at boundaries. + """ if difference is None: return None @@ -222,7 +238,29 @@ def calculate_rating_level( ) return PRICE_RATING_NORMAL - # Classify based on thresholds + # Apply hysteresis based on previous state + # The idea: make it "harder" to leave the current state than to enter it + if previous_rating == "LOW": + # Currently LOW: need to exceed threshold_low + hysteresis to leave + exit_threshold_low = threshold_low + hysteresis + if difference <= exit_threshold_low: + return "LOW" + # Check if we should go to HIGH (rare, but possible with large price swings) + if difference >= threshold_high: + return "HIGH" + return PRICE_RATING_NORMAL + + if previous_rating == "HIGH": + # Currently HIGH: need to drop below threshold_high - hysteresis to leave + exit_threshold_high = threshold_high - hysteresis + if difference >= exit_threshold_high: + return "HIGH" + # Check if we should go to LOW (rare, but possible with large price swings) + if difference <= threshold_low: + return "LOW" + return PRICE_RATING_NORMAL + + # No previous state or previous was NORMAL: use standard thresholds if difference <= threshold_low: return "LOW" @@ -232,12 +270,15 @@ def calculate_rating_level( return PRICE_RATING_NORMAL -def _process_price_interval( +def _process_price_interval( # noqa: PLR0913 - Extra params needed for hysteresis price_interval: dict[str, Any], all_prices: list[dict[str, Any]], threshold_low: float, threshold_high: float, -) -> None: + *, + previous_rating: str | None = None, + hysteresis: float = 0.0, +) -> str | None: """ Process a single price interval and add difference and rating_level. @@ -246,16 +287,20 @@ def _process_price_interval( all_prices: All available price intervals for lookback calculation threshold_low: Low threshold percentage threshold_high: High threshold percentage - day_label: Label for logging ("today" or "tomorrow") + previous_rating: The rating level of the previous interval (for hysteresis) + hysteresis: The hysteresis percentage to prevent flickering + + Returns: + The calculated rating_level (for use as previous_rating in next call) """ starts_at = price_interval.get("startsAt") # Already datetime object in local timezone if not starts_at: - return + return previous_rating current_interval_price = price_interval.get("total") if current_interval_price is None: - return + return previous_rating # Calculate trailing average trailing_avg = calculate_trailing_average_for_interval(starts_at, all_prices) @@ -265,20 +310,238 @@ def _process_price_interval( difference = calculate_difference_percentage(float(current_interval_price), trailing_avg) price_interval["difference"] = difference - # Calculate rating_level based on difference - rating_level = calculate_rating_level(difference, threshold_low, threshold_high) + # Calculate rating_level based on difference with hysteresis + rating_level = calculate_rating_level( + difference, + threshold_low, + threshold_high, + previous_rating=previous_rating, + hysteresis=hysteresis, + ) price_interval["rating_level"] = rating_level - else: - # Set to None if we couldn't calculate (expected for intervals in first 24h) - price_interval["difference"] = None - price_interval["rating_level"] = None + return rating_level + + # Set to None if we couldn't calculate (expected for intervals in first 24h) + price_interval["difference"] = None + price_interval["rating_level"] = None + return None -def enrich_price_info_with_differences( +def _build_rating_blocks( + rated_intervals: list[tuple[int, dict[str, Any], str]], +) -> list[tuple[int, int, str, int]]: + """ + Build list of contiguous rating blocks from rated intervals. + + Args: + rated_intervals: List of (original_idx, interval_dict, rating) tuples + + Returns: + List of (start_idx, end_idx, rating, length) tuples where indices + refer to positions in rated_intervals + + """ + blocks: list[tuple[int, int, str, int]] = [] + if not rated_intervals: + return blocks + + block_start = 0 + current_rating = rated_intervals[0][2] + + for idx in range(1, len(rated_intervals)): + if rated_intervals[idx][2] != current_rating: + # End current block + blocks.append((block_start, idx - 1, current_rating, idx - block_start)) + block_start = idx + current_rating = rated_intervals[idx][2] + + # Don't forget the last block + blocks.append((block_start, len(rated_intervals) - 1, current_rating, len(rated_intervals) - block_start)) + return blocks + + +def _calculate_gravitational_pull( + blocks: list[tuple[int, int, str, int]], + block_idx: int, + direction: str, + gap_tolerance: int, +) -> tuple[int, str]: + """ + Calculate "gravitational pull" from neighboring blocks in one direction. + + This finds the first LARGE block (> gap_tolerance) in the given direction + and returns its size and rating. Small intervening blocks are "looked through". + + This approach ensures that small isolated blocks are always pulled toward + the dominant large block, even if there are other small blocks in between. + + Args: + blocks: List of (start_idx, end_idx, rating, length) tuples + block_idx: Index of the current block being evaluated + direction: "left" or "right" + gap_tolerance: Maximum size of blocks considered "small" + + Returns: + Tuple of (size, rating) of the first large block found, + or (immediate_neighbor_size, immediate_neighbor_rating) if no large block exists + + """ + probe_range = range(block_idx - 1, -1, -1) if direction == "left" else range(block_idx + 1, len(blocks)) + total_small_accumulated = 0 + + for probe_idx in probe_range: + probe_rating = blocks[probe_idx][2] + probe_size = blocks[probe_idx][3] + + if probe_size > gap_tolerance: + # Found a large block - return its characteristics + # Add any accumulated small blocks of the same rating + if total_small_accumulated > 0: + return (probe_size + total_small_accumulated, probe_rating) + return (probe_size, probe_rating) + + # Small block - accumulate if same rating as what we've seen + total_small_accumulated += probe_size + + # No large block found - return the immediate neighbor's info + neighbor_idx = block_idx - 1 if direction == "left" else block_idx + 1 + return (blocks[neighbor_idx][3], blocks[neighbor_idx][2]) + + +def _apply_rating_gap_tolerance( + all_intervals: list[dict[str, Any]], + gap_tolerance: int, +) -> None: + """ + Apply gap tolerance to smooth out isolated rating level changes. + + This is a post-processing step after hysteresis. It identifies short sequences + of intervals (≤ gap_tolerance) and merges them into the larger neighboring block. + The algorithm is bidirectional - it compares block sizes on both sides and + assigns the small block to whichever neighbor is larger. + + This matches human intuition: a single "different" interval feels like it + should belong to the larger surrounding group. + + Example with gap_tolerance=1: + LOW LOW LOW NORMAL LOW LOW → LOW LOW LOW LOW LOW LOW + (single NORMAL gets merged into larger LOW block) + + Example with gap_tolerance=1 (bidirectional): + NORMAL NORMAL HIGH NORMAL HIGH HIGH HIGH → NORMAL NORMAL HIGH HIGH HIGH HIGH HIGH + (single NORMAL at position 4 gets merged into larger HIGH block on the right) + + Args: + all_intervals: List of price intervals with rating_level already set (modified in-place) + gap_tolerance: Maximum number of consecutive "different" intervals to smooth out + + Note: + - Compares block sizes on both sides and merges small blocks into larger neighbors + - If both neighbors have equal size, prefers the LEFT neighbor (earlier in time) + - Skips intervals without rating_level (None) + - Intervals must be sorted chronologically for this to work correctly + - Multiple passes may be needed as merging can create new small blocks + + """ + if gap_tolerance <= 0: + return + + # Extract intervals with valid rating_level in chronological order + rated_intervals: list[tuple[int, dict[str, Any], str]] = [ + (i, interval, interval["rating_level"]) + for i, interval in enumerate(all_intervals) + if interval.get("rating_level") is not None + ] + + if len(rated_intervals) < 3: # noqa: PLR2004 - Minimum 3 for before/gap/after pattern + return + + # Iteratively merge small blocks until no more changes + max_iterations = 10 + total_corrections = 0 + + for iteration in range(max_iterations): + blocks = _build_rating_blocks(rated_intervals) + corrections_this_pass = _merge_small_blocks(blocks, rated_intervals, gap_tolerance) + total_corrections += corrections_this_pass + + if corrections_this_pass == 0: + break + + _LOGGER.debug( + "Gap tolerance pass %d: merged %d small blocks", + iteration + 1, + corrections_this_pass, + ) + + if total_corrections > 0: + _LOGGER.debug("Gap tolerance: total %d block merges across all passes", total_corrections) + + +def _merge_small_blocks( + blocks: list[tuple[int, int, str, int]], + rated_intervals: list[tuple[int, dict[str, Any], str]], + gap_tolerance: int, +) -> int: + """ + Merge small blocks into their larger neighbors. + + CRITICAL: This function collects ALL merge decisions FIRST, then applies them. + This prevents the order of processing from affecting outcomes. Without this, + earlier blocks could be merged incorrectly because the gravitational pull + calculation would see already-modified neighbors instead of the original state. + + The merge decision is based on the FIRST LARGE BLOCK in each direction, + looking through any small intervening blocks. This ensures consistent + behavior when multiple small blocks are adjacent. + + Args: + blocks: List of (start_idx, end_idx, rating, length) tuples + rated_intervals: List of (original_idx, interval_dict, rating) tuples (modified in-place) + gap_tolerance: Maximum size of blocks to merge + + Returns: + Number of blocks merged in this pass + + """ + # Phase 1: Collect all merge decisions based on ORIGINAL block state + merge_decisions: list[tuple[int, int, str]] = [] # (start_ri_idx, end_ri_idx, target_rating) + + for block_idx, (start, end, rating, length) in enumerate(blocks): + if length > gap_tolerance: + continue + + # Must have neighbors on BOTH sides (not an edge block) + if block_idx == 0 or block_idx == len(blocks) - 1: + continue + + # Calculate gravitational pull from each direction + left_pull, left_rating = _calculate_gravitational_pull(blocks, block_idx, "left", gap_tolerance) + right_pull, right_rating = _calculate_gravitational_pull(blocks, block_idx, "right", gap_tolerance) + + # Determine target rating (prefer left if equal) + target_rating = left_rating if left_pull >= right_pull else right_rating + + if rating != target_rating: + merge_decisions.append((start, end, target_rating)) + + # Phase 2: Apply all merge decisions + for start, end, target_rating in merge_decisions: + for ri_idx in range(start, end + 1): + original_idx, interval, _old_rating = rated_intervals[ri_idx] + interval["rating_level"] = target_rating + rated_intervals[ri_idx] = (original_idx, interval, target_rating) + + return len(merge_decisions) + + +def enrich_price_info_with_differences( # noqa: PLR0913 - Extra params for rating stabilization all_intervals: list[dict[str, Any]], *, threshold_low: float | None = None, threshold_high: float | None = None, + hysteresis: float | None = None, + gap_tolerance: int | None = None, time: TibberPricesTimeService | None = None, # noqa: ARG001 # Used in production (via coordinator), kept for compatibility ) -> list[dict[str, Any]]: """ @@ -287,15 +550,29 @@ def enrich_price_info_with_differences( Computes the trailing 24-hour average, difference percentage, and rating level for intervals that have sufficient lookback data (in-place modification). + Uses hysteresis to prevent flickering at threshold boundaries. When an interval's + difference is near a threshold, hysteresis ensures that the rating only changes + when there's a significant movement, not just minor fluctuations. + + After hysteresis, applies gap tolerance as post-processing to smooth out any + remaining isolated rating changes (e.g., a single NORMAL interval surrounded + by LOW intervals gets corrected to LOW). + CRITICAL: Only enriches intervals that have at least 24 hours of prior data available. This is determined by checking if (interval_start - earliest_interval_start) >= 24h. Works independently of interval density (24 vs 96 intervals/day) and handles transition periods (e.g., Oct 1, 2025) correctly. + CRITICAL: Intervals are processed in chronological order to properly apply + hysteresis. The rating_level of each interval depends on the previous interval's + rating to prevent rapid switching at threshold boundaries. + Args: all_intervals: Flat list of all price intervals (day_before_yesterday + yesterday + today + tomorrow). threshold_low: Low threshold percentage for rating_level (defaults to -10) threshold_high: High threshold percentage for rating_level (defaults to 10) + hysteresis: Hysteresis percentage to prevent flickering (defaults to 2.0) + gap_tolerance: Max consecutive intervals to smooth out (defaults to 1, 0 = disabled) time: TibberPricesTimeService instance (kept for API compatibility, not used) Returns: @@ -311,6 +588,8 @@ def enrich_price_info_with_differences( """ threshold_low = threshold_low if threshold_low is not None else -10 threshold_high = threshold_high if threshold_high is not None else 10 + hysteresis = hysteresis if hysteresis is not None else DEFAULT_PRICE_RATING_HYSTERESIS + gap_tolerance = gap_tolerance if gap_tolerance is not None else DEFAULT_PRICE_RATING_GAP_TOLERANCE if not all_intervals: return all_intervals @@ -330,25 +609,42 @@ def enrich_price_info_with_differences( # Only intervals starting at or after this boundary have full 24h lookback enrichment_boundary = earliest_start + timedelta(hours=24) - # Process intervals (modifies in-place) + # CRITICAL: Sort intervals by time for proper hysteresis application + # We need to process intervals in chronological order so each interval + # can use the previous interval's rating_level for hysteresis + intervals_with_time: list[tuple[dict[str, Any], datetime]] = [ + (interval, starts_at) for interval in all_intervals if (starts_at := interval.get("startsAt")) is not None + ] + intervals_with_time.sort(key=lambda x: x[1]) + + # Process intervals in chronological order (modifies in-place) # CRITICAL: Only enrich intervals that start >= 24h after earliest data enriched_count = 0 skipped_count = 0 + previous_rating: str | None = None - for price_interval in all_intervals: - starts_at = price_interval.get("startsAt") - if not starts_at: - skipped_count += 1 - continue - + for price_interval, starts_at in intervals_with_time: # Skip if interval doesn't have full 24h lookback if starts_at < enrichment_boundary: skipped_count += 1 continue - _process_price_interval(price_interval, all_intervals, threshold_low, threshold_high) + # Process interval and get its rating for use as previous_rating in next iteration + previous_rating = _process_price_interval( + price_interval, + all_intervals, + threshold_low, + threshold_high, + previous_rating=previous_rating, + hysteresis=hysteresis, + ) enriched_count += 1 + # Apply gap tolerance as post-processing step + # This smooths out isolated rating changes that slip through hysteresis + if gap_tolerance > 0: + _apply_rating_gap_tolerance(all_intervals, gap_tolerance) + return all_intervals diff --git a/tests/test_rating_threshold_validation.py b/tests/test_rating_threshold_validation.py index e3b7351..c758e8c 100644 --- a/tests/test_rating_threshold_validation.py +++ b/tests/test_rating_threshold_validation.py @@ -5,7 +5,10 @@ import logging import pytest from _pytest.logging import LogCaptureFixture -from custom_components.tibber_prices.utils.price import calculate_rating_level +from custom_components.tibber_prices.utils.price import ( + _apply_rating_gap_tolerance, + calculate_rating_level, +) @pytest.fixture @@ -142,3 +145,342 @@ def test_rating_level_asymmetric_thresholds() -> None: assert calculate_rating_level(-15.0, threshold_low, threshold_high) == "NORMAL" assert calculate_rating_level(0.0, threshold_low, threshold_high) == "NORMAL" assert calculate_rating_level(6.0, threshold_low, threshold_high) == "HIGH" + + +# ============================================================================= +# Hysteresis Tests - Prevent flickering at threshold boundaries +# ============================================================================= + + +def test_hysteresis_prevents_flickering_at_low_threshold() -> None: + """Test that hysteresis prevents rapid switching at LOW threshold boundary.""" + threshold_low = -10.0 + threshold_high = 10.0 + hysteresis = 2.0 + + # Without previous state: enters LOW at threshold + assert calculate_rating_level(-10.0, threshold_low, threshold_high) == "LOW" + + # With previous state LOW: stays LOW until exceeds exit threshold (-10 + 2 = -8) + assert ( + calculate_rating_level(-9.5, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis) + == "LOW" + ) + assert ( + calculate_rating_level(-8.5, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis) + == "LOW" + ) + # Exits LOW when exceeding exit threshold + assert ( + calculate_rating_level(-7.5, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis) + == "NORMAL" + ) + + # With previous state NORMAL: enters LOW at standard threshold + assert ( + calculate_rating_level(-10.0, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis) + == "LOW" + ) + assert ( + calculate_rating_level(-9.5, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis) + == "NORMAL" + ) + + +def test_hysteresis_prevents_flickering_at_high_threshold() -> None: + """Test that hysteresis prevents rapid switching at HIGH threshold boundary.""" + threshold_low = -10.0 + threshold_high = 10.0 + hysteresis = 2.0 + + # Without previous state: enters HIGH at threshold + assert calculate_rating_level(10.0, threshold_low, threshold_high) == "HIGH" + + # With previous state HIGH: stays HIGH until drops below exit threshold (10 - 2 = 8) + assert ( + calculate_rating_level(9.5, threshold_low, threshold_high, previous_rating="HIGH", hysteresis=hysteresis) + == "HIGH" + ) + assert ( + calculate_rating_level(8.5, threshold_low, threshold_high, previous_rating="HIGH", hysteresis=hysteresis) + == "HIGH" + ) + # Exits HIGH when dropping below exit threshold + assert ( + calculate_rating_level(7.5, threshold_low, threshold_high, previous_rating="HIGH", hysteresis=hysteresis) + == "NORMAL" + ) + + # With previous state NORMAL: enters HIGH at standard threshold + assert ( + calculate_rating_level(10.0, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis) + == "HIGH" + ) + assert ( + calculate_rating_level(9.5, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis) + == "NORMAL" + ) + + +def test_hysteresis_allows_direct_transition_low_to_high() -> None: + """Test that extreme price swings can jump directly from LOW to HIGH.""" + threshold_low = -10.0 + threshold_high = 10.0 + hysteresis = 2.0 + + # Even when in LOW state, a very high price should transition to HIGH + assert ( + calculate_rating_level(15.0, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis) + == "HIGH" + ) + + # And vice versa + assert ( + calculate_rating_level(-15.0, threshold_low, threshold_high, previous_rating="HIGH", hysteresis=hysteresis) + == "LOW" + ) + + +def test_hysteresis_with_zero_value() -> None: + """Test that zero hysteresis behaves like original function.""" + threshold_low = -10.0 + threshold_high = 10.0 + hysteresis = 0.0 + + # With zero hysteresis, should behave exactly like original function + assert ( + calculate_rating_level(-10.0, threshold_low, threshold_high, previous_rating="NORMAL", hysteresis=hysteresis) + == "LOW" + ) + assert ( + calculate_rating_level(-9.9, threshold_low, threshold_high, previous_rating="LOW", hysteresis=hysteresis) + == "NORMAL" + ) + + +def test_hysteresis_sequence_simulation() -> None: + """Simulate a sequence of price changes to verify hysteresis prevents flickering.""" + threshold_low = -10.0 + threshold_high = 10.0 + hysteresis = 2.0 + + # Simulate price differences oscillating around -10% threshold + price_differences = [-9.5, -10.2, -9.8, -10.1, -9.9, -10.3, -8.5, -9.0, -7.5] + expected_without_hysteresis = ["NORMAL", "LOW", "NORMAL", "LOW", "NORMAL", "LOW", "NORMAL", "NORMAL", "NORMAL"] + expected_with_hysteresis = ["NORMAL", "LOW", "LOW", "LOW", "LOW", "LOW", "LOW", "LOW", "NORMAL"] + + # Without hysteresis: lots of flickering + results_without = [calculate_rating_level(diff, threshold_low, threshold_high) for diff in price_differences] + assert results_without == expected_without_hysteresis + + # With hysteresis: stable blocks + results_with: list[str] = [] + previous: str | None = None + for diff in price_differences: + rating = calculate_rating_level( + diff, threshold_low, threshold_high, previous_rating=previous, hysteresis=hysteresis + ) + results_with.append(rating) + previous = rating + assert results_with == expected_with_hysteresis + + +# ============================================================================= +# Gap Tolerance Tests - Smooth out isolated rating changes +# ============================================================================= + + +def test_gap_tolerance_single_interval() -> None: + """Test that a single isolated interval gets smoothed out.""" + # Create intervals with a single NORMAL surrounded by LOW + intervals = [ + {"rating_level": "LOW"}, + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, # Isolated - should be corrected + {"rating_level": "LOW"}, + {"rating_level": "LOW"}, + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + + assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "LOW", "LOW"] + + +def test_gap_tolerance_two_intervals() -> None: + """Test that two consecutive isolated intervals get smoothed out with gap_tolerance=2.""" + # Two NORMALs surrounded by LOW + intervals = [ + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, + {"rating_level": "NORMAL"}, + {"rating_level": "LOW"}, + ] + + # With gap_tolerance=1, should NOT be corrected (2 > 1) + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + assert [i["rating_level"] for i in intervals] == ["LOW", "NORMAL", "NORMAL", "LOW"] + + # With gap_tolerance=2, SHOULD be corrected + _apply_rating_gap_tolerance(intervals, gap_tolerance=2) + assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "LOW"] + + +def test_gap_tolerance_different_surrounding_ratings() -> None: + """Test that gaps between different ratings are merged into the larger neighbor.""" + # NORMAL surrounded by LOW (2) on left, HIGH (2) on right + # Both have equal size, so it should merge into LEFT (earlier in time) + intervals = [ + {"rating_level": "LOW"}, + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, + {"rating_level": "HIGH"}, + {"rating_level": "HIGH"}, + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + + # With equal neighbors, prefer LEFT (LOW) - single NORMAL merges into LOW + assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "HIGH", "HIGH"] + + +def test_gap_tolerance_bidirectional_larger_right() -> None: + """Test that gaps merge into the larger neighboring block (right side).""" + # NORMAL surrounded by LOW (2) on left, HIGH (3) on right + # RIGHT is larger, so NORMAL should merge into HIGH + intervals = [ + {"rating_level": "LOW"}, + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, # Should merge into HIGH (larger neighbor) + {"rating_level": "HIGH"}, + {"rating_level": "HIGH"}, + {"rating_level": "HIGH"}, + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + + # NORMAL merges into HIGH because HIGH block is larger + assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "HIGH", "HIGH", "HIGH", "HIGH"] + + +def test_gap_tolerance_bidirectional_larger_left() -> None: + """Test that gaps merge into the larger neighboring block (left side).""" + # NORMAL surrounded by LOW (3) on left, HIGH (2) on right + # LEFT is larger, so NORMAL should merge into LOW + intervals = [ + {"rating_level": "LOW"}, + {"rating_level": "LOW"}, + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, # Should merge into LOW (larger neighbor) + {"rating_level": "HIGH"}, + {"rating_level": "HIGH"}, + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + + # NORMAL merges into LOW because LOW block is larger + assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "LOW", "HIGH", "HIGH"] + + +def test_gap_tolerance_chain_merge() -> None: + """ + Test the real-world scenario: NORMAL HIGH NORMAL HIGH HIGH HIGH. + + This is the 05:30-06:45 scenario where: + - 05:45 is HIGH (single, diff=+14%) + - 06:00 is NORMAL (single, diff=+3.2%) + - 06:15+ is HIGH (large block, diff>+12%) + + The algorithm looks for the first LARGE block in each direction, + not just the immediate neighbor. This ensures small blocks are + pulled toward the dominant large block. + """ + intervals = [ + {"rating_level": "NORMAL"}, # 05:30 + {"rating_level": "HIGH"}, # 05:45 - single HIGH + {"rating_level": "NORMAL"}, # 06:00 - single NORMAL + {"rating_level": "HIGH"}, # 06:15 + {"rating_level": "HIGH"}, # 06:30 + {"rating_level": "HIGH"}, # 06:45 + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + + # With the "look through small blocks" logic: + # - HIGH(1) at idx 1: left=NORMAL(1) small, looks further left=nothing → left_pull=(1, NORMAL) + # right=NORMAL(1) small, looks further right=HIGH(3) → right_pull=(3, HIGH) + # Since it's already HIGH and right pull is HIGH, no change needed + # - NORMAL(1) at idx 2: left=HIGH(1) small, looks further left=NORMAL(1) small → left_pull=(1, NORMAL) + # right=HIGH(3) large → right_pull=(3, HIGH) + # Merges into HIGH (larger pull) + # Result: NORMAL(1), HIGH(5) + assert [i["rating_level"] for i in intervals] == [ + "NORMAL", + "HIGH", + "HIGH", + "HIGH", + "HIGH", + "HIGH", + ] + + +def test_gap_tolerance_multiple_gaps() -> None: + """Test that multiple gaps in a sequence get corrected.""" + intervals = [ + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, # Gap 1 + {"rating_level": "LOW"}, + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, # Gap 2 + {"rating_level": "LOW"}, + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + + assert [i["rating_level"] for i in intervals] == ["LOW", "LOW", "LOW", "LOW", "LOW", "LOW"] + + +def test_gap_tolerance_disabled() -> None: + """Test that gap_tolerance=0 disables the feature.""" + intervals = [ + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, + {"rating_level": "LOW"}, + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=0) + + # Should remain unchanged + assert [i["rating_level"] for i in intervals] == ["LOW", "NORMAL", "LOW"] + + +def test_gap_tolerance_with_none_ratings() -> None: + """Test that None ratings are skipped correctly.""" + intervals = [ + {"rating_level": None}, # Skipped + {"rating_level": "LOW"}, + {"rating_level": "NORMAL"}, + {"rating_level": "LOW"}, + {"rating_level": None}, # Skipped + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + + assert intervals[0]["rating_level"] is None + assert intervals[1]["rating_level"] == "LOW" + assert intervals[2]["rating_level"] == "LOW" # Corrected + assert intervals[3]["rating_level"] == "LOW" + assert intervals[4]["rating_level"] is None + + +def test_gap_tolerance_high_rating_gaps() -> None: + """Test gap tolerance for HIGH ratings.""" + intervals = [ + {"rating_level": "HIGH"}, + {"rating_level": "NORMAL"}, # Isolated + {"rating_level": "HIGH"}, + {"rating_level": "HIGH"}, + ] + + _apply_rating_gap_tolerance(intervals, gap_tolerance=1) + + assert [i["rating_level"] for i in intervals] == ["HIGH", "HIGH", "HIGH", "HIGH"]