hass.tibber_prices/docs/developer/versioned_docs/version-v0.25.0b0/performance.md

6.9 KiB

Performance Optimization

Guidelines for maintaining and improving integration performance.

Performance Goals

Target metrics:

  • Coordinator update: <500ms (typical: 200-300ms)
  • Sensor update: <10ms per sensor
  • Period calculation: <100ms (typical: 20-50ms)
  • Memory footprint: <10MB per home
  • API calls: <100 per day per home

Profiling

Timing Decorator

Use for performance-critical functions:

import time
import functools

def timing(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        duration = time.perf_counter() - start
        _LOGGER.debug("%s took %.3fms", func.__name__, duration * 1000)
        return result
    return wrapper

@timing
def expensive_calculation():
    # Your code here

Memory Profiling

import tracemalloc

tracemalloc.start()
# Run your code
current, peak = tracemalloc.get_traced_memory()
_LOGGER.info("Memory: current=%.2fMB peak=%.2fMB",
             current / 1024**2, peak / 1024**2)
tracemalloc.stop()

Async Profiling

# Install aioprof
uv pip install aioprof

# Run with profiling
python -m aioprof homeassistant -c config

Optimization Patterns

Caching

1. Persistent Cache (API data):

# Already implemented in coordinator/cache.py
store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
data = await store.async_load()

2. Translation Cache (in-memory):

# Already implemented in const.py
_TRANSLATION_CACHE: dict[str, dict] = {}

def get_translation(path: str, language: str) -> dict:
    cache_key = f"{path}_{language}"
    if cache_key not in _TRANSLATION_CACHE:
        _TRANSLATION_CACHE[cache_key] = load_translation(path, language)
    return _TRANSLATION_CACHE[cache_key]

3. Config Cache (invalidated on options change):

class DataTransformer:
    def __init__(self):
        self._config_cache: dict | None = None

    def get_config(self) -> dict:
        if self._config_cache is None:
            self._config_cache = self._build_config()
        return self._config_cache

    def invalidate_config_cache(self):
        self._config_cache = None

Lazy Loading

Load data only when needed:

@property
def extra_state_attributes(self) -> dict | None:
    """Return attributes."""
    # Calculate only when accessed
    if self.entity_description.key == "complex_sensor":
        return self._calculate_complex_attributes()
    return None

Bulk Operations

Process multiple items at once:

# ❌ Slow - loop with individual operations
for interval in intervals:
    enriched = enrich_single_interval(interval)
    results.append(enriched)

# ✅ Fast - bulk processing
results = enrich_intervals_bulk(intervals)

Async Best Practices

1. Concurrent API calls:

# ❌ Sequential (slow)
user_data = await fetch_user_data()
price_data = await fetch_price_data()

# ✅ Concurrent (fast)
user_data, price_data = await asyncio.gather(
    fetch_user_data(),
    fetch_price_data()
)

2. Don't block event loop:

# ❌ Blocking
result = heavy_computation()  # Blocks for seconds

# ✅ Non-blocking
result = await hass.async_add_executor_job(heavy_computation)

Memory Management

Avoid Memory Leaks

1. Clear references:

class Coordinator:
    async def async_shutdown(self):
        """Clean up resources."""
        self._listeners.clear()
        self._data = None
        self._cache = None

2. Use weak references for callbacks:

import weakref

class Manager:
    def __init__(self):
        self._callbacks: list[weakref.ref] = []

    def register(self, callback):
        self._callbacks.append(weakref.ref(callback))

Efficient Data Structures

Use appropriate types:

# ❌ List for lookups (O(n))
if timestamp in timestamp_list:
    ...

# ✅ Set for lookups (O(1))
if timestamp in timestamp_set:
    ...

# ❌ List comprehension with filter
results = [x for x in items if condition(x)]

# ✅ Generator for large datasets
results = (x for x in items if condition(x))

Coordinator Optimization

Minimize API Calls

Already implemented:

  • Cache valid until midnight
  • User data cached for 24h
  • Only poll when tomorrow data expected

Monitor API usage:

_LOGGER.debug("API call: %s (cache_age=%s)",
              endpoint, cache_age)

Smart Updates

Only update when needed:

async def _async_update_data(self) -> dict:
    """Fetch data from API."""
    if self._is_cache_valid():
        _LOGGER.debug("Using cached data")
        return self.data

    # Fetch new data
    return await self._fetch_data()

Database Impact

State Class Selection

Affects long-term statistics storage:

# ❌ MEASUREMENT for prices (stores every change)
state_class=SensorStateClass.MEASUREMENT  # ~35K records/year

# ✅ None for prices (no long-term stats)
state_class=None  # Only current state

# ✅ TOTAL for counters only
state_class=SensorStateClass.TOTAL  # For cumulative values

Attribute Size

Keep attributes minimal:

# ❌ Large nested structures (KB per update)
attributes = {
    "all_intervals": [...],  # 384 intervals
    "full_history": [...],   # Days of data
}

# ✅ Essential data only (bytes per update)
attributes = {
    "timestamp": "...",
    "rating_level": "...",
    "next_interval": "...",
}

Testing Performance

Benchmark Tests

import pytest
import time

@pytest.mark.benchmark
def test_period_calculation_performance(coordinator):
    """Period calculation should complete in &lt;100ms."""
    start = time.perf_counter()

    periods = calculate_periods(coordinator.data)

    duration = time.perf_counter() - start
    assert duration < 0.1, f"Too slow: {duration:.3f}s"

Load Testing

@pytest.mark.integration
async def test_multiple_homes_performance(hass):
    """Test with 10 homes."""
    coordinators = []
    for i in range(10):
        coordinator = create_coordinator(hass, home_id=f"home_{i}")
        await coordinator.async_refresh()
        coordinators.append(coordinator)

    # Verify memory usage
    # Verify update times

Monitoring in Production

Log Performance Metrics

@timing
async def _async_update_data(self) -> dict:
    """Fetch data with timing."""
    result = await self._fetch_data()
    _LOGGER.info("Update completed in %.2fs", timing_duration)
    return result

Memory Tracking

import psutil
import os

process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024**2
_LOGGER.debug("Current memory usage: %.2f MB", memory_mb)

💡 Related: