Skip to main content

IoT Sensor Monitoring with Streaming Anomaly Detection

This use case demonstrates how to build a production-ready IoT sensor monitoring system using LlamaFarm's streaming anomaly detection and Polars buffer APIs.

Scenario

You're monitoring a fleet of industrial sensors (temperature, humidity, pressure) that produce data every second. You need to:

  1. Detect equipment malfunctions in real-time
  2. Handle concept drift as seasons change
  3. Alert operators to anomalies within seconds
  4. Run on edge devices with limited resources

Architecture

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ Sensors │───▶│ LlamaFarm │───▶│ Alerts │
│ (1000/sec) │ │ Streaming │ │ Dashboard │
└─────────────┘ └─────────────┘ └─────────────┘

┌──────┴──────┐
│ Polars │
│ Buffer │
│ (rolling) │
└─────────────┘

Why These Components?

ComponentWhy
Streaming APIHandles cold start, auto-retraining, sliding window
ECOD BackendFast (5000+ samples/sec), parameter-free, handles drift
Polars BuffersSub-millisecond append, efficient rolling features
Rolling FeaturesDetect gradual trends and sudden spikes

Backend Selection Guide

For IoT sensor monitoring:

BackendWhen to UseSpeedAccuracy
ecodDefault choice - works great for most sensors⚡⚡⚡⚡⭐⭐⭐⭐
hbosUltra-high throughput (>10k samples/sec)⚡⚡⚡⚡⚡⭐⭐⭐
isolation_forestComplex multi-sensor correlations⚡⚡⚡⭐⭐⭐⭐
lodaMemory-constrained edge devices⚡⚡⚡⚡⭐⭐⭐

For this use case, we'll use ecod because:

  • Parameter-free (no tuning needed)
  • Fast enough for 1000 samples/second
  • Handles seasonal drift naturally

Step 1: Configure the Detector

# sensor_monitor.py
import httpx
import asyncio
import random
from datetime import datetime

# Configuration
LLAMAFARM_URL = "http://localhost:14345/v1/ml"
MODEL_ID = "factory-sensors"
BACKEND = "ecod"

# Streaming parameters tuned for IoT
DETECTOR_CONFIG = {
"model": MODEL_ID,
"backend": BACKEND,
"min_samples": 100, # Enough for initial pattern learning
"retrain_interval": 500, # Retrain every 500 samples (~8 minutes)
"window_size": 3600, # Keep 1 hour of history
"threshold": 0.5, # Balanced sensitivity
"contamination": 0.05, # Expect ~5% anomalies
# Enable rolling features for trend detection
"rolling_windows": [10, 60, 300], # 10s, 1min, 5min windows
"include_lags": True,
"lag_periods": [1, 5, 10] # Compare to recent values
}

Step 2: Create the Streaming Loop

async def stream_sensor_data():
"""Main streaming loop for sensor monitoring."""
async with httpx.AsyncClient(timeout=30.0) as client:
sample_count = 0
anomaly_count = 0

while True:
# Simulate sensor reading
sensor_data = read_sensors()

# Send to streaming detector
response = await client.post(
f"{LLAMAFARM_URL}/anomaly/stream",
json={**DETECTOR_CONFIG, "data": sensor_data}
)
result = response.json()

sample_count += 1

# Check status
if result["status"] == "collecting":
print(f"Cold start: {result['samples_until_ready']} samples until ready")
else:
# Process results
for r in result["results"]:
if r["is_anomaly"]:
anomaly_count += 1
await send_alert(sensor_data, r["score"])

# Log progress every 100 samples
if sample_count % 100 == 0:
print(f"Processed: {sample_count}, Anomalies: {anomaly_count}, "
f"Model version: {result['model_version']}")

await asyncio.sleep(1) # 1 sample per second


def read_sensors() -> dict:
"""Simulate sensor readings (replace with real sensor code)."""
# Normal operating ranges
temp = 72 + random.gauss(0, 2) # 72°F ± 2
humidity = 45 + random.gauss(0, 3) # 45% ± 3
pressure = 1013 + random.gauss(0, 5) # 1013 hPa ± 5

# Simulate occasional real anomalies
if random.random() < 0.02: # 2% chance
temp = random.choice([50, 95]) # Equipment malfunction

return {
"temperature": round(temp, 2),
"humidity": round(humidity, 2),
"pressure": round(pressure, 2),
"timestamp": datetime.now().isoformat()
}


async def send_alert(data: dict, score: float):
"""Send alert to monitoring system."""
print(f"🚨 ANOMALY DETECTED! Score: {score:.3f}")
print(f" Data: temp={data['temperature']}, humidity={data['humidity']}")
# In production: send to PagerDuty, Slack, email, etc.

Step 3: Understanding the Rolling Features

When rolling_windows is enabled, the detector automatically computes:

FeatureWindowWhat It Detects
temperature_rolling_mean_1010 samplesShort-term average
temperature_rolling_std_1010 samplesShort-term volatility
temperature_rolling_mean_6060 samples1-minute trend
temperature_rolling_mean_3005 minutesLonger baseline
temperature_lag_1PreviousSudden jumps
temperature_lag_1010 agoCompare to recent

Why this matters:

  • A sensor reading of 80°F might be normal if it's been trending up
  • The same 80°F is anomalous if it jumped from 72°F suddenly
  • Rolling features capture this context automatically

Step 4: Handling Concept Drift

Sensors naturally drift with:

  • Seasonal temperature changes
  • Equipment aging
  • Calibration shifts

The streaming API handles this automatically:

# The sliding window naturally adapts
window_size = 3600 # Keep 1 hour

# Periodic retraining incorporates new patterns
retrain_interval = 500 # Every ~8 minutes

# Result: the model continuously adapts to gradual changes
# while still detecting sudden anomalies

For sudden operational changes (e.g., switching to night shift):

# Reset the detector to restart learning
await client.post(f"{LLAMAFARM_URL}/anomaly/stream/{MODEL_ID}/reset")
print("Detector reset - starting fresh learning phase")

Step 5: Monitoring Detector Health

async def monitor_detector_health():
"""Periodically check detector status."""
async with httpx.AsyncClient() as client:
while True:
response = await client.get(
f"{LLAMAFARM_URL}/anomaly/stream/{MODEL_ID}"
)
stats = response.json()

print(f"""
Detector Health:
- Status: {stats['status']}
- Model Version: {stats['model_version']}
- Samples Collected: {stats['samples_collected']}
- Total Processed: {stats['total_processed']}
- Samples Since Retrain: {stats['samples_since_retrain']}
- Is Ready: {stats['is_ready']}
""")

await asyncio.sleep(60) # Check every minute

Step 6: Using Polars Buffers for Custom Analysis

For advanced analysis beyond anomaly detection:

async def analyze_with_polars(sensor_data: dict):
"""Use Polars buffers for custom feature engineering.

Args:
sensor_data: Sensor reading dict with values like temperature, humidity, etc.
"""
async with httpx.AsyncClient() as client:
# Create dedicated analysis buffer
await client.post(f"{LLAMAFARM_URL}/polars/buffers", json={
"buffer_id": "sensor-analysis",
"window_size": 3600 # 1 hour
})

# In your streaming loop, also append to analysis buffer
await client.post(f"{LLAMAFARM_URL}/polars/append", json={
"buffer_id": "sensor-analysis",
"data": sensor_data
})

# Get custom features for dashboard/reporting
features = await client.post(f"{LLAMAFARM_URL}/polars/features", json={
"buffer_id": "sensor-analysis",
"rolling_windows": [60, 300, 900], # 1min, 5min, 15min
"include_rolling_stats": ["mean", "std", "min", "max"],
"include_lags": True,
"tail": 100 # Last 100 readings with features
})

return features.json()["data"]

Complete Example

#!/usr/bin/env python3
"""Complete IoT sensor monitoring example."""

import asyncio
import httpx
import random
from datetime import datetime

LLAMAFARM_URL = "http://localhost:14345/v1/ml"
MODEL_ID = "factory-sensors"

async def main():
"""Run the sensor monitoring system."""
async with httpx.AsyncClient(timeout=30.0) as client:
print("🏭 Starting IoT Sensor Monitor")
print(f" Model: {MODEL_ID}")
print(f" Backend: ecod")
print("-" * 50)

sample_count = 0
anomaly_count = 0

while True:
# Generate sensor reading
temp = 72 + random.gauss(0, 2)
humidity = 45 + random.gauss(0, 3)
pressure = 1013 + random.gauss(0, 5)

# Inject anomaly every ~50 samples
if sample_count > 100 and random.random() < 0.02:
temp = random.choice([50, 95])
print(f"💉 Injecting anomaly: temp={temp}")

sensor_data = {
"temperature": round(temp, 2),
"humidity": round(humidity, 2),
"pressure": round(pressure, 2)
}

# Stream to detector
response = await client.post(
f"{LLAMAFARM_URL}/anomaly/stream",
json={
"model": MODEL_ID,
"data": sensor_data,
"backend": "ecod",
"min_samples": 50,
"retrain_interval": 100,
"window_size": 1000,
"threshold": 0.5,
"rolling_windows": [5, 10, 20],
"include_lags": True
}
)
result = response.json()
sample_count += 1

# Handle response
if result["status"] == "collecting":
if sample_count % 10 == 0:
print(f"⏳ Collecting... {result['samples_until_ready']} until ready")
else:
for r in result["results"]:
if r["is_anomaly"]:
anomaly_count += 1
print(f"🚨 ANOMALY #{anomaly_count}: score={r['score']:.3f}, "
f"data={sensor_data}")

if sample_count % 50 == 0:
print(f"✅ Sample {sample_count}: "
f"anomalies={anomaly_count}, "
f"version={result['model_version']}")

await asyncio.sleep(0.1) # 10 samples/second for demo

if __name__ == "__main__":
asyncio.run(main())

Performance Considerations

Edge Device Optimization

For resource-constrained edge devices:

# Use lighter backend
"backend": "loda", # Lower memory than ecod

# Smaller windows
"window_size": 500, # 8 minutes instead of 1 hour
"min_samples": 30, # Faster cold start

# Disable rolling features to save memory
# (let the backend handle pattern detection)
"rolling_windows": None,
"include_lags": False

High-Throughput Optimization

For >1000 samples/second:

# Batch multiple readings
batch = [read_sensors() for _ in range(100)]
await client.post(url, json={**config, "data": batch})

# Use fastest backend
"backend": "hbos", # Histogram-based, extremely fast

# Increase retrain interval
"retrain_interval": 5000, # Retrain less frequently

Troubleshooting

IssueSolution
Too many false positivesIncrease threshold (0.6-0.7) or contamination (0.1-0.2)
Missing real anomaliesDecrease threshold (0.3-0.4)
Slow cold startDecrease min_samples (30-50)
Memory growingDecrease window_size
Model not adaptingDecrease retrain_interval

Next Steps