From 568fb219f273a2006791dc920488c6ed564afb8e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 1 Dec 2025 01:23:42 +0000 Subject: [PATCH] Add connect_segments parameter to get_chartdata service for visual segment connections Co-authored-by: jpawlowski <75446+jpawlowski@users.noreply.github.com> --- custom_components/tibber_prices/services.yaml | 8 + .../tibber_prices/services/get_chartdata.py | 73 ++++++-- .../tibber_prices/translations/en.json | 4 + tests/test_connect_segments.py | 158 ++++++++++++++++++ 4 files changed, 231 insertions(+), 12 deletions(-) create mode 100644 tests/test_connect_segments.py diff --git a/custom_components/tibber_prices/services.yaml b/custom_components/tibber_prices/services.yaml index f6eeb3e..b9b7ff5 100644 --- a/custom_components/tibber_prices/services.yaml +++ b/custom_components/tibber_prices/services.yaml @@ -158,6 +158,14 @@ get_chartdata: - segments - all translation_key: insert_nulls + connect_segments: + name: Connect Segments + description: >- + [ONLY WITH insert_nulls='segments'] When enabled, adds connecting points at segment boundaries to visually connect different price level segments in stepline charts. When price goes DOWN at a boundary, adds a point with the lower price at the end of the current segment. When price goes UP, adds a hold point before the gap. This creates smooth visual transitions between segments instead of abrupt gaps. + required: false + default: false + selector: + boolean: # === OUTPUT FORMAT === output_format: name: Output Format diff --git a/custom_components/tibber_prices/services/get_chartdata.py b/custom_components/tibber_prices/services/get_chartdata.py index 5915078..456afa1 100644 --- a/custom_components/tibber_prices/services/get_chartdata.py +++ b/custom_components/tibber_prices/services/get_chartdata.py @@ -92,6 +92,7 @@ CHARTDATA_SERVICE_SCHEMA: Final = vol.Schema( [vol.In([PRICE_RATING_LOW, PRICE_RATING_NORMAL, PRICE_RATING_HIGH])], ), vol.Optional("insert_nulls", default="none"): vol.In(["none", "segments", "all"]), + vol.Optional("connect_segments", default=False): bool, vol.Optional("add_trailing_null", default=False): bool, vol.Optional("period_filter"): vol.In(["best_price", "peak_price"]), vol.Optional("start_time_field", default="start_time"): str, @@ -156,6 +157,7 @@ async def handle_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR091 include_rating_level = call.data.get("include_rating_level", False) include_average = call.data.get("include_average", False) insert_nulls = call.data.get("insert_nulls", "none") + connect_segments = call.data.get("connect_segments", False) add_trailing_null = call.data.get("add_trailing_null", False) period_filter = call.data.get("period_filter") # Filter values are already normalized to uppercase by schema validators @@ -336,6 +338,7 @@ async def handle_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR091 start_time = interval.get("startsAt") price = interval.get("total") + next_price = next_interval.get("total") next_start_time = next_interval.get("startsAt") if start_time is None or price is None: @@ -370,24 +373,70 @@ async def handle_chartdata(call: ServiceCall) -> dict[str, Any]: # noqa: PLR091 # Check if next interval is different level (segment boundary) if next_value != interval_value: - # Hold current price until next timestamp (stepline effect) next_start_serialized = ( next_start_time.isoformat() if next_start_time and hasattr(next_start_time, "isoformat") else next_start_time ) - hold_point = {start_time_field: next_start_serialized, price_field: converted_price} - if include_level and "level" in interval: - hold_point[level_field] = interval["level"] - if include_rating_level and "rating_level" in interval: - hold_point[rating_level_field] = interval["rating_level"] - if include_average and day in day_averages: - hold_point[average_field] = day_averages[day] - chart_data.append(hold_point) - # Add NULL point to create gap - null_point = {start_time_field: next_start_serialized, price_field: None} - chart_data.append(null_point) + if connect_segments and next_price is not None: + # Connect segments visually by adding transition points + # Convert next price for comparison and use + converted_next_price = ( + round(next_price * 100, 2) if minor_currency else round(next_price, 4) + ) + if round_decimals is not None: + converted_next_price = round(converted_next_price, round_decimals) + + if next_price < price: + # Price goes DOWN: Add point at end of current segment with lower price + # This draws the line downward from current level + connect_point = { + start_time_field: next_start_serialized, + price_field: converted_next_price, + } + if include_level and "level" in interval: + connect_point[level_field] = interval["level"] + if include_rating_level and "rating_level" in interval: + connect_point[rating_level_field] = interval["rating_level"] + if include_average and day in day_averages: + connect_point[average_field] = day_averages[day] + chart_data.append(connect_point) + else: + # Price goes UP or stays same: Add hold point with current price + # Then add connect point at next level with higher price + hold_point = { + start_time_field: next_start_serialized, + price_field: converted_price, + } + if include_level and "level" in interval: + hold_point[level_field] = interval["level"] + if include_rating_level and "rating_level" in interval: + hold_point[rating_level_field] = interval["rating_level"] + if include_average and day in day_averages: + hold_point[average_field] = day_averages[day] + chart_data.append(hold_point) + + # Add NULL point to create gap after transition + null_point = {start_time_field: next_start_serialized, price_field: None} + chart_data.append(null_point) + else: + # Original behavior: Hold current price until next timestamp + hold_point = { + start_time_field: next_start_serialized, + price_field: converted_price, + } + if include_level and "level" in interval: + hold_point[level_field] = interval["level"] + if include_rating_level and "rating_level" in interval: + hold_point[rating_level_field] = interval["rating_level"] + if include_average and day in day_averages: + hold_point[average_field] = day_averages[day] + chart_data.append(hold_point) + + # Add NULL point to create gap + null_point = {start_time_field: next_start_serialized, price_field: None} + chart_data.append(null_point) # Handle last interval of the day - extend to midnight if day_prices: diff --git a/custom_components/tibber_prices/translations/en.json b/custom_components/tibber_prices/translations/en.json index 6d34517..a34c390 100644 --- a/custom_components/tibber_prices/translations/en.json +++ b/custom_components/tibber_prices/translations/en.json @@ -873,6 +873,10 @@ "name": "Insert NULL Values", "description": "Control NULL value insertion for filtered data. 'none' (default): No NULL values, only matching intervals. 'segments': Add NULL points at segment boundaries for clean gaps in charts (recommended for stepline charts). 'all': Insert NULL for all timestamps where filter doesn't match (useful for continuous time series visualization)." }, + "connect_segments": { + "name": "Connect Segments", + "description": "[ONLY WITH insert_nulls='segments'] When enabled, adds connecting points at segment boundaries to visually connect different price level segments in stepline charts. When price goes DOWN at a boundary, adds a point with the lower price at the end of the current segment. When price goes UP, adds a hold point before the gap. This creates smooth visual transitions between segments instead of abrupt gaps." + }, "add_trailing_null": { "name": "Add Trailing Null Point", "description": "[BOTH FORMATS] Add a final data point with null values (except timestamp) at the end. Some chart libraries need this to prevent extrapolation/interpolation to the viewport edge when using stepline rendering. Leave disabled unless your chart requires it." diff --git a/tests/test_connect_segments.py b/tests/test_connect_segments.py new file mode 100644 index 0000000..49bcf03 --- /dev/null +++ b/tests/test_connect_segments.py @@ -0,0 +1,158 @@ +"""Test connect_segments feature for ApexCharts segment boundaries.""" + +from datetime import UTC, datetime, timedelta + + +def make_interval(start_time: datetime, price: float, level: str) -> dict: + """Create a price interval for testing.""" + return { + "startsAt": start_time, + "total": price, + "level": level, + } + + +def make_day_prices(base_time: datetime) -> list[dict]: + """Create sample price data with level transitions.""" + return [ + # First segment: CHEAP at low price + make_interval(base_time, 0.10, "CHEAP"), + make_interval(base_time + timedelta(minutes=15), 0.11, "CHEAP"), + # Transition: CHEAP -> NORMAL (price goes UP) + make_interval(base_time + timedelta(minutes=30), 0.20, "NORMAL"), + make_interval(base_time + timedelta(minutes=45), 0.21, "NORMAL"), + # Transition: NORMAL -> CHEAP (price goes DOWN) + make_interval(base_time + timedelta(minutes=60), 0.12, "CHEAP"), + make_interval(base_time + timedelta(minutes=75), 0.13, "CHEAP"), + ] + + +class TestConnectSegmentsLogic: + """Test the connect_segments transition logic.""" + + def test_price_direction_detection_down(self) -> None: + """Test that price going down is correctly detected.""" + current_price = 0.20 + next_price = 0.12 + assert next_price < current_price, "Should detect price going down" + + def test_price_direction_detection_up(self) -> None: + """Test that price going up is correctly detected.""" + current_price = 0.11 + next_price = 0.20 + assert next_price > current_price, "Should detect price going up" + + def test_price_direction_detection_same(self) -> None: + """Test that same price is handled correctly (treated as up).""" + current_price = 0.15 + next_price = 0.15 + assert not (next_price < current_price), "Same price should not be treated as 'down'" + + def test_sample_data_structure(self) -> None: + """Test that sample data has expected structure.""" + base_time = datetime(2025, 12, 1, 0, 0, tzinfo=UTC) + prices = make_day_prices(base_time) + + assert len(prices) == 6, "Should have 6 intervals" + + # Check first transition (CHEAP -> NORMAL at index 1->2) + assert prices[1]["level"] == "CHEAP" + assert prices[2]["level"] == "NORMAL" + assert prices[2]["total"] > prices[1]["total"], "Price should go UP at this transition" + + # Check second transition (NORMAL -> CHEAP at index 3->4) + assert prices[3]["level"] == "NORMAL" + assert prices[4]["level"] == "CHEAP" + assert prices[4]["total"] < prices[3]["total"], "Price should go DOWN at this transition" + + +class TestConnectSegmentsOutput: + """Test the expected output format with connect_segments enabled.""" + + def test_transition_point_down_has_lower_price(self) -> None: + """ + When price goes DOWN at boundary, the transition point should have the lower price. + + This creates a visual line going downward from the current segment level. + """ + current_price = 0.20 + next_price = 0.12 + + # With connect_segments=True and price going down: + # The transition point should use next_price (lower price) + transition_price = min(current_price, next_price) + + assert transition_price == next_price + assert transition_price == 0.12 + + def test_transition_point_up_has_current_price(self) -> None: + """ + When price goes UP at boundary, the hold point should have current price. + + This creates a visual hold at current level before the gap. + """ + current_price = 0.11 + next_price = 0.20 + + # With connect_segments=True and price going up: + # The hold point should use current_price (extend current level) + hold_price = min(current_price, next_price) + + assert hold_price == current_price + assert hold_price == 0.11 + + +class TestSegmentBoundaryDetection: + """Test detection of segment boundaries.""" + + def test_same_level_no_boundary(self) -> None: + """Intervals with same level should not create boundary.""" + interval_value = "CHEAP" + next_value = "CHEAP" + is_boundary = next_value != interval_value + assert not is_boundary + + def test_different_level_creates_boundary(self) -> None: + """Intervals with different level should create boundary.""" + interval_value = "CHEAP" + next_value = "NORMAL" + is_boundary = next_value != interval_value + assert is_boundary + + def test_filter_match_check(self) -> None: + """Test that filter matching works correctly.""" + filter_values = ["CHEAP"] + interval_value = "CHEAP" + next_value = "NORMAL" + + matches_filter = interval_value in filter_values + assert matches_filter, "CHEAP should match filter" + + next_matches = next_value in filter_values + assert not next_matches, "NORMAL should not match filter" + + +class TestPriceConversion: + """Test price conversion logic used in connect_segments.""" + + def test_minor_currency_conversion(self) -> None: + """Test conversion to minor currency (cents/øre).""" + price = 0.12 # EUR + minor_currency = True + converted = round(price * 100, 2) if minor_currency else round(price, 4) + assert converted == 12.0, "0.12 EUR should be 12 cents" + + def test_major_currency_rounding(self) -> None: + """Test major currency precision.""" + price = 0.123456 + minor_currency = False + converted = round(price * 100, 2) if minor_currency else round(price, 4) + assert converted == 0.1235, "Should round to 4 decimal places" + + def test_custom_rounding(self) -> None: + """Test custom decimal rounding.""" + price = 0.12345 + converted = round(price, 4) + round_decimals = 2 + final = round(converted, round_decimals) + assert final == 0.12, "Should round to 2 decimal places"