feat(sensors): improve price trend sensors with temporal context

Enhanced current_price_trend and next_price_trend_change sensors with
consistent temporal information and fixed trend calculation logic.

Changes:
- Fixed trend calculation order: Calculate final trend state (momentum +
  future outlook) BEFORE scanning for next change, ensuring consistency
  between current_price_trend state and next_price_trend_change from_direction
- Added TIME_SENSITIVE_ENTITY_KEYS registration for both trend sensors
  to enable automatic 15-minute boundary updates (Timer #2)
- Removed redundant timestamp field from _trend_change_attributes (was
  duplicate of sensor state)
- Added timestamp attribute (current interval) to both sensors as first
  attribute for temporal reference
- Implemented _find_trend_start_time() to scan backward and determine
  when current trend began
- Added trend_duration_minutes to current_price_trend showing how long
  current trend has been active
- Added from_direction to current_price_trend showing previous trend
  state (enables detection of valleys/plateaus)
- Added minutes_until_change to next_price_trend_change showing time
  until trend changes
- Removed redundant attributes: valid_until, duration_hours,
  duration_minutes from current_price_trend (can be derived from
  next_price_trend_change sensor)
- Removed redundant next_direction from current_price_trend (available
  in next_price_trend_change sensor)

current_price_trend attributes:
- timestamp: Current interval (calculation basis)
- from_direction: Previous trend state (e.g., "stable" → "falling" = starting decline)
- trend_duration_minutes: How long current trend has been active

next_price_trend_change attributes:
- timestamp: Current interval (calculation basis)
- from_direction: Current trend state (should match current_price_trend state)
- direction: Target trend state
- minutes_until_change: Time until change occurs
- current_price_now, price_at_change, avg_after_change, trend_diff_%

Impact: Users can now detect important transitions (valleys: falling→stable,
plateaus: rising→stable) and understand trend context. Both sensors update
automatically every 15 minutes with consistent information.
This commit is contained in:
Julian Pawlowski 2025-11-16 17:09:16 +00:00
parent 76dc488bb5
commit 3a9ba55dd3
3 changed files with 129 additions and 37 deletions

View file

@ -176,6 +176,9 @@ TIME_SENSITIVE_ENTITY_KEYS = frozenset(
"next_avg_6h",
"next_avg_8h",
"next_avg_12h",
# Current/future price trend sensors (time-sensitive, update at interval boundaries)
"current_price_trend",
"next_price_trend_change",
# Price trend sensors
"price_trend_1h",
"price_trend_2h",

View file

@ -67,8 +67,21 @@ def _add_cached_trend_attributes(attributes: dict, key: str, cached_data: dict)
if key.startswith("price_trend_") and cached_data.get("trend_attributes"):
attributes.update(cached_data["trend_attributes"])
elif key == "current_price_trend" and cached_data.get("current_trend_attributes"):
# Add timestamp of current interval FIRST (when calculation was made)
now = dt_util.now()
minute = (now.minute // 15) * 15
current_interval_timestamp = now.replace(minute=minute, second=0, microsecond=0)
attributes["timestamp"] = current_interval_timestamp.isoformat()
# Then add other cached attributes
attributes.update(cached_data["current_trend_attributes"])
elif key == "next_price_trend_change" and cached_data.get("trend_change_attributes"):
# Add timestamp of current interval FIRST (when calculation was made)
# State contains the timestamp of the trend change itself
now = dt_util.now()
minute = (now.minute // 15) * 15
current_interval_timestamp = now.replace(minute=minute, second=0, microsecond=0)
attributes["timestamp"] = current_interval_timestamp.isoformat()
# Then add other cached attributes
attributes.update(cached_data["trend_change_attributes"])

View file

@ -153,6 +153,10 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if self.entity_description.key.startswith("price_trend_"):
self._cached_trend_value = None
self._trend_attributes = {}
# Clear trend calculation cache for trend sensors
elif self.entity_description.key in ("current_price_trend", "next_price_trend_change"):
self._trend_calculation_cache = None
self._trend_calculation_timestamp = None
self.async_write_ha_state()
@callback
@ -1179,32 +1183,16 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
# Step 2: Calculate 3h baseline trend for comparison
current_trend_3h = self._calculate_standard_trend(all_intervals, current_index, current_interval, thresholds)
# Step 3: Find next trend change from current momentum
scan_params = {
"current_index": current_index,
"current_trend_state": current_momentum, # Use momentum, not 3h baseline
"current_interval": current_interval,
"now": now,
}
next_change_time = self._scan_for_trend_change(all_intervals, scan_params, thresholds)
# Step 4: Calculate final trend combining momentum + future outlook
# Step 3: Calculate final trend FIRST (momentum + future outlook)
min_intervals_for_trend = 4
standard_lookahead = 12 # 3 hours
if next_change_time:
time_diff = next_change_time - now
intervals_until_change = int(time_diff.total_seconds() / 900) # 900s = 15min
lookahead_intervals = max(min_intervals_for_trend, intervals_until_change)
else:
lookahead_intervals = standard_lookahead
lookahead_intervals = standard_lookahead
# Get future data
future_intervals = all_intervals[current_index + 1 : current_index + lookahead_intervals + 1]
future_prices = [float(fi["total"]) for fi in future_intervals if "total" in fi]
# Combine momentum + future outlook
# Combine momentum + future outlook to get ACTUAL current trend
if len(future_intervals) >= min_intervals_for_trend and future_prices:
future_avg = sum(future_prices) / len(future_prices)
current_trend_state = self._combine_momentum_with_future(
@ -1222,24 +1210,45 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
# Not enough future data - use 3h baseline as fallback
current_trend_state = current_trend_3h
# Step 4: Find next trend change FROM the current trend state (not momentum!)
scan_params = {
"current_index": current_index,
"current_trend_state": current_trend_state, # Use FINAL trend, not momentum
"current_interval": current_interval,
"now": now,
}
next_change_time = self._scan_for_trend_change(all_intervals, scan_params, thresholds)
# Step 5: Find when current trend started (scan backward)
trend_start_time, from_direction = self._find_trend_start_time(
all_intervals, current_index, current_trend_state, thresholds
)
# Calculate duration of current trend
trend_duration_minutes = None
if trend_start_time:
duration = now - trend_start_time
trend_duration_minutes = int(duration.total_seconds() / 60)
# Build result dictionary
next_direction = self._trend_change_attributes.get("direction") if self._trend_change_attributes else None
# Calculate minutes until change
minutes_until_change = None
if next_change_time:
time_diff = next_change_time - now
minutes_until_change = int(time_diff.total_seconds() / 60)
result = {
"current_trend_state": current_trend_state,
"next_change_time": next_change_time,
"next_change_direction": next_direction,
"valid_until": next_change_time.isoformat() if next_change_time else None,
"duration_hours": None,
"duration_minutes": None,
"trend_change_attributes": self._trend_change_attributes,
"trend_start_time": trend_start_time,
"from_direction": from_direction,
"trend_duration_minutes": trend_duration_minutes,
"minutes_until_change": minutes_until_change,
}
if next_change_time:
time_diff = next_change_time - now
result["duration_hours"] = round(time_diff.total_seconds() / 3600, 1)
result["duration_minutes"] = int(time_diff.total_seconds() / 60)
# Cache the result
self._trend_calculation_cache = result
self._trend_calculation_timestamp = now
@ -1263,10 +1272,8 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
# Set attributes for this sensor
self._current_trend_attributes = {
"valid_until": trend_info["valid_until"],
"next_direction": trend_info["next_change_direction"],
"duration_hours": trend_info["duration_hours"],
"duration_minutes": trend_info["duration_minutes"],
"from_direction": trend_info["from_direction"],
"trend_duration_minutes": trend_info["trend_duration_minutes"],
}
return trend_info["current_trend_state"]
@ -1279,6 +1286,78 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
return idx
return None
def _find_trend_start_time(
self,
all_intervals: list,
current_index: int,
current_trend_state: str,
thresholds: dict,
) -> tuple[datetime | None, str | None]:
"""
Find when the current trend started by scanning backward.
Args:
all_intervals: List of all price intervals
current_index: Index of current interval
current_trend_state: Current trend state ("rising", "falling", "stable")
thresholds: Threshold configuration
Returns:
Tuple of (start_time, from_direction):
- start_time: When current trend began, or None if at data boundary
- from_direction: Previous trend direction, or None if unknown
"""
intervals_in_3h = 12 # 3 hours = 12 intervals @ 15min each
# Scan backward to find when trend changed TO current state
for i in range(current_index - 1, max(-1, current_index - 97), -1):
if i < 0:
break
interval = all_intervals[i]
interval_start = dt_util.parse_datetime(interval["startsAt"])
if not interval_start:
continue
interval_start = dt_util.as_local(interval_start)
# Calculate trend at this past interval
future_intervals = all_intervals[i + 1 : i + intervals_in_3h + 1]
if len(future_intervals) < intervals_in_3h:
break # Not enough data to calculate trend
future_prices = [float(fi["total"]) for fi in future_intervals if "total" in fi]
if not future_prices:
continue
future_avg = sum(future_prices) / len(future_prices)
price = float(interval["total"])
# Calculate trend at this past point
lookahead_for_volatility = all_intervals[i : i + intervals_in_3h]
trend_state, _ = calculate_price_trend(
price,
future_avg,
threshold_rising=thresholds["rising"],
threshold_falling=thresholds["falling"],
volatility_adjustment=True,
lookahead_intervals=intervals_in_3h,
all_intervals=lookahead_for_volatility,
volatility_threshold_moderate=thresholds["moderate"],
volatility_threshold_high=thresholds["high"],
)
# Check if trend was different from current trend state
if trend_state != current_trend_state:
# Found the change point - the NEXT interval is where current trend started
next_interval = all_intervals[i + 1]
trend_start = dt_util.parse_datetime(next_interval["startsAt"])
if trend_start:
return dt_util.as_local(trend_start), trend_state
# Reached data boundary - current trend extends beyond available data
return None, None
def _scan_for_trend_change(
self,
all_intervals: list,
@ -1342,19 +1421,16 @@ class TibberPricesSensor(TibberPricesEntity, SensorEntity):
if trend_state != current_trend_state:
# Store details for attributes
time_diff = interval_start - now
hours_until = time_diff.total_seconds() / 3600
minutes_until = int(time_diff.total_seconds() / 60)
self._trend_change_attributes = {
"timestamp": interval_start.isoformat(),
"direction": trend_state,
"from_direction": current_trend_state,
"minutes_until_change": minutes_until,
"current_price_now": round(float(current_interval["total"]) * 100, 2),
"price_at_change": round(current_price * 100, 2),
"avg_after_change": round(future_avg * 100, 2),
"trend_diff_%": round((future_avg - current_price) / current_price * 100, 1),
"hours_until_change": round(hours_until, 1),
"minutes_until_change": minutes_until,
}
return interval_start