hass.tibber_prices/docs/developer/docs/performance.md
Julian Pawlowski d73eda4b2f git commit -m "feat(docs): add dual Docusaurus sites with custom branding and Giscus integration
- Split documentation into separate User and Developer sites
- Migrated existing docs to proper Docusaurus structure
- Added custom Tibber-themed header logos (light + dark mode variants)
- Implemented custom color scheme matching integration branding
  - Hero gradient: Cyan → Dark Cyan → Gold
  - Removed standard Docusaurus purple/green theme
- Integrated Giscus comments system for community collaboration
  - User docs: Comments enabled on guides, examples, FAQ
  - User docs: Comments disabled on reference pages (glossary, sensors, troubleshooting)
  - Developer docs: No comments (GitHub Issues/PRs preferred)
- Added categorized sidebars with emoji navigation
- Created 8 new placeholder documentation pages
- Fixed image paths for baseUrl compatibility (local + GitHub Pages)
- Escaped MDX special characters in performance metrics
- Added GitHub Actions workflow for automated deployment
- Created helper scripts: dev-user, dev-developer, build-all

Breaking changes:
- Moved /docs/user/*.md to /docs/user/docs/*.md
- Moved /docs/development/*.md to /docs/developer/docs/*.md
2025-12-06 01:37:06 +00:00

322 lines
6.9 KiB
Markdown

# Performance Optimization
Guidelines for maintaining and improving integration performance.
## Performance Goals
Target metrics:
- **Coordinator update**: <500ms (typical: 200-300ms)
- **Sensor update**: <10ms per sensor
- **Period calculation**: <100ms (typical: 20-50ms)
- **Memory footprint**: <10MB per home
- **API calls**: <100 per day per home
## Profiling
### Timing Decorator
Use for performance-critical functions:
```python
import time
import functools
def timing(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
duration = time.perf_counter() - start
_LOGGER.debug("%s took %.3fms", func.__name__, duration * 1000)
return result
return wrapper
@timing
def expensive_calculation():
# Your code here
```
### Memory Profiling
```python
import tracemalloc
tracemalloc.start()
# Run your code
current, peak = tracemalloc.get_traced_memory()
_LOGGER.info("Memory: current=%.2fMB peak=%.2fMB",
current / 1024**2, peak / 1024**2)
tracemalloc.stop()
```
### Async Profiling
```bash
# Install aioprof
uv pip install aioprof
# Run with profiling
python -m aioprof homeassistant -c config
```
## Optimization Patterns
### Caching
**1. Persistent Cache** (API data):
```python
# Already implemented in coordinator/cache.py
store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
data = await store.async_load()
```
**2. Translation Cache** (in-memory):
```python
# Already implemented in const.py
_TRANSLATION_CACHE: dict[str, dict] = {}
def get_translation(path: str, language: str) -> dict:
cache_key = f"{path}_{language}"
if cache_key not in _TRANSLATION_CACHE:
_TRANSLATION_CACHE[cache_key] = load_translation(path, language)
return _TRANSLATION_CACHE[cache_key]
```
**3. Config Cache** (invalidated on options change):
```python
class DataTransformer:
def __init__(self):
self._config_cache: dict | None = None
def get_config(self) -> dict:
if self._config_cache is None:
self._config_cache = self._build_config()
return self._config_cache
def invalidate_config_cache(self):
self._config_cache = None
```
### Lazy Loading
**Load data only when needed:**
```python
@property
def extra_state_attributes(self) -> dict | None:
"""Return attributes."""
# Calculate only when accessed
if self.entity_description.key == "complex_sensor":
return self._calculate_complex_attributes()
return None
```
### Bulk Operations
**Process multiple items at once:**
```python
# ❌ Slow - loop with individual operations
for interval in intervals:
enriched = enrich_single_interval(interval)
results.append(enriched)
# ✅ Fast - bulk processing
results = enrich_intervals_bulk(intervals)
```
### Async Best Practices
**1. Concurrent API calls:**
```python
# ❌ Sequential (slow)
user_data = await fetch_user_data()
price_data = await fetch_price_data()
# ✅ Concurrent (fast)
user_data, price_data = await asyncio.gather(
fetch_user_data(),
fetch_price_data()
)
```
**2. Don't block event loop:**
```python
# ❌ Blocking
result = heavy_computation() # Blocks for seconds
# ✅ Non-blocking
result = await hass.async_add_executor_job(heavy_computation)
```
## Memory Management
### Avoid Memory Leaks
**1. Clear references:**
```python
class Coordinator:
async def async_shutdown(self):
"""Clean up resources."""
self._listeners.clear()
self._data = None
self._cache = None
```
**2. Use weak references for callbacks:**
```python
import weakref
class Manager:
def __init__(self):
self._callbacks: list[weakref.ref] = []
def register(self, callback):
self._callbacks.append(weakref.ref(callback))
```
### Efficient Data Structures
**Use appropriate types:**
```python
# ❌ List for lookups (O(n))
if timestamp in timestamp_list:
...
# ✅ Set for lookups (O(1))
if timestamp in timestamp_set:
...
# ❌ List comprehension with filter
results = [x for x in items if condition(x)]
# ✅ Generator for large datasets
results = (x for x in items if condition(x))
```
## Coordinator Optimization
### Minimize API Calls
**Already implemented:**
- Cache valid until midnight
- User data cached for 24h
- Only poll when tomorrow data expected
**Monitor API usage:**
```python
_LOGGER.debug("API call: %s (cache_age=%s)",
endpoint, cache_age)
```
### Smart Updates
**Only update when needed:**
```python
async def _async_update_data(self) -> dict:
"""Fetch data from API."""
if self._is_cache_valid():
_LOGGER.debug("Using cached data")
return self.data
# Fetch new data
return await self._fetch_data()
```
## Database Impact
### State Class Selection
**Affects long-term statistics storage:**
```python
# ❌ MEASUREMENT for prices (stores every change)
state_class=SensorStateClass.MEASUREMENT # ~35K records/year
# ✅ None for prices (no long-term stats)
state_class=None # Only current state
# ✅ TOTAL for counters only
state_class=SensorStateClass.TOTAL # For cumulative values
```
### Attribute Size
**Keep attributes minimal:**
```python
# ❌ Large nested structures (KB per update)
attributes = {
"all_intervals": [...], # 384 intervals
"full_history": [...], # Days of data
}
# ✅ Essential data only (bytes per update)
attributes = {
"timestamp": "...",
"rating_level": "...",
"next_interval": "...",
}
```
## Testing Performance
### Benchmark Tests
```python
import pytest
import time
@pytest.mark.benchmark
def test_period_calculation_performance(coordinator):
"""Period calculation should complete in <100ms."""
start = time.perf_counter()
periods = calculate_periods(coordinator.data)
duration = time.perf_counter() - start
assert duration < 0.1, f"Too slow: {duration:.3f}s"
```
### Load Testing
```python
@pytest.mark.integration
async def test_multiple_homes_performance(hass):
"""Test with 10 homes."""
coordinators = []
for i in range(10):
coordinator = create_coordinator(hass, home_id=f"home_{i}")
await coordinator.async_refresh()
coordinators.append(coordinator)
# Verify memory usage
# Verify update times
```
## Monitoring in Production
### Log Performance Metrics
```python
@timing
async def _async_update_data(self) -> dict:
"""Fetch data with timing."""
result = await self._fetch_data()
_LOGGER.info("Update completed in %.2fs", timing_duration)
return result
```
### Memory Tracking
```python
import psutil
import os
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024**2
_LOGGER.debug("Current memory usage: %.2f MB", memory_mb)
```
---
💡 **Related:**
- [Caching Strategy](caching-strategy.md) - Cache layers
- [Architecture](architecture.md) - System design
- [Debugging](debugging.md) - Profiling tools