Agent skill
Sensor Data Processing
Handling ingestion, storage, and analysis of time-series data from IoT devices using TimescaleDB, stream processing, aggregation, and anomaly detection for scalable IoT data pipelines.
Stars
163
Forks
31
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/sensor-data-processing
SKILL.md
Sensor Data Processing
Current Level: Advanced
Domain: IoT / Data Engineering
Overview
Sensor data processing handles ingestion, storage, and analysis of time-series data from IoT devices. This guide covers TimescaleDB, stream processing, and anomaly detection for building efficient data pipelines that handle high-volume sensor data.
Data Ingestion Patterns
python
# mqtt_ingestion.py
import paho.mqtt.client as mqtt
import json
from datetime import datetime
import psycopg2
class SensorDataIngestion:
def __init__(self, broker_url, db_config):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.conn = psycopg2.connect(**db_config)
self.cursor = self.conn.cursor()
self.client.connect(broker_url, 1883, 60)
def on_connect(self, client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe("sensors/#")
def on_message(self, client, userdata, msg):
try:
data = json.loads(msg.payload.decode())
self.store_sensor_data(msg.topic, data)
except Exception as e:
print(f"Error processing message: {e}")
def store_sensor_data(self, topic, data):
query = """
INSERT INTO sensor_data (device_id, sensor_type, value, unit, timestamp)
VALUES (%s, %s, %s, %s, %s)
"""
self.cursor.execute(query, (
data.get('device_id'),
data.get('sensor_type'),
data.get('value'),
data.get('unit'),
datetime.fromtimestamp(data.get('timestamp', datetime.now().timestamp()))
))
self.conn.commit()
def start(self):
self.client.loop_forever()
# Usage
ingestion = SensorDataIngestion(
broker_url='mqtt://broker.hivemq.com',
db_config={
'host': 'localhost',
'database': 'iot_data',
'user': 'postgres',
'password': 'password'
}
)
ingestion.start()
Time-Series Data Storage
TimescaleDB Schema
sql
-- Create extension
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- sensor_data table
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
device_id VARCHAR(255) NOT NULL,
sensor_type VARCHAR(100) NOT NULL,
value DOUBLE PRECISION NOT NULL,
unit VARCHAR(50),
metadata JSONB,
PRIMARY KEY (time, device_id, sensor_type)
);
-- Convert to hypertable
SELECT create_hypertable('sensor_data', 'time');
-- Create indexes
CREATE INDEX idx_device_time ON sensor_data (device_id, time DESC);
CREATE INDEX idx_sensor_type ON sensor_data (sensor_type, time DESC);
-- Continuous aggregates
CREATE MATERIALIZED VIEW sensor_data_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
device_id,
sensor_type,
AVG(value) as avg_value,
MIN(value) as min_value,
MAX(value) as max_value,
COUNT(*) as sample_count
FROM sensor_data
GROUP BY hour, device_id, sensor_type;
-- Retention policy
SELECT add_retention_policy('sensor_data', INTERVAL '90 days');
-- Compression policy
SELECT add_compression_policy('sensor_data', INTERVAL '7 days');
Data Access Layer
python
# timescale_repository.py
from datetime import datetime, timedelta
import psycopg2
from typing import List, Dict
class TimescaleRepository:
def __init__(self, db_config):
self.conn = psycopg2.connect(**db_config)
def insert_sensor_data(self, device_id: str, sensor_type: str,
value: float, unit: str, metadata: dict = None):
query = """
INSERT INTO sensor_data (time, device_id, sensor_type, value, unit, metadata)
VALUES (NOW(), %s, %s, %s, %s, %s)
"""
with self.conn.cursor() as cursor:
cursor.execute(query, (device_id, sensor_type, value, unit,
json.dumps(metadata) if metadata else None))
self.conn.commit()
def get_latest_reading(self, device_id: str, sensor_type: str) -> Dict:
query = """
SELECT time, value, unit
FROM sensor_data
WHERE device_id = %s AND sensor_type = %s
ORDER BY time DESC
LIMIT 1
"""
with self.conn.cursor() as cursor:
cursor.execute(query, (device_id, sensor_type))
row = cursor.fetchone()
if row:
return {
'time': row[0],
'value': row[1],
'unit': row[2]
}
return None
def get_time_series(self, device_id: str, sensor_type: str,
start_time: datetime, end_time: datetime) -> List[Dict]:
query = """
SELECT time, value, unit
FROM sensor_data
WHERE device_id = %s
AND sensor_type = %s
AND time BETWEEN %s AND %s
ORDER BY time ASC
"""
with self.conn.cursor() as cursor:
cursor.execute(query, (device_id, sensor_type, start_time, end_time))
rows = cursor.fetchall()
return [
{'time': row[0], 'value': row[1], 'unit': row[2]}
for row in rows
]
def get_aggregated_data(self, device_id: str, sensor_type: str,
interval: str, start_time: datetime, end_time: datetime):
query = f"""
SELECT
time_bucket('{interval}', time) AS bucket,
AVG(value) as avg_value,
MIN(value) as min_value,
MAX(value) as max_value,
COUNT(*) as count
FROM sensor_data
WHERE device_id = %s
AND sensor_type = %s
AND time BETWEEN %s AND %s
GROUP BY bucket
ORDER BY bucket ASC
"""
with self.conn.cursor() as cursor:
cursor.execute(query, (device_id, sensor_type, start_time, end_time))
rows = cursor.fetchall()
return [
{
'time': row[0],
'avg': row[1],
'min': row[2],
'max': row[3],
'count': row[4]
}
for row in rows
]
Data Validation
python
# data_validator.py
from typing import Dict, List
import numpy as np
class SensorDataValidator:
def __init__(self):
self.validation_rules = {
'temperature': {'min': -50, 'max': 100, 'unit': 'celsius'},
'humidity': {'min': 0, 'max': 100, 'unit': 'percent'},
'pressure': {'min': 800, 'max': 1200, 'unit': 'hPa'}
}
def validate(self, sensor_type: str, value: float, unit: str) -> Dict:
if sensor_type not in self.validation_rules:
return {'valid': True, 'warnings': []}
rules = self.validation_rules[sensor_type]
warnings = []
# Check range
if value < rules['min'] or value > rules['max']:
warnings.append(f"Value {value} outside valid range [{rules['min']}, {rules['max']}]")
# Check unit
if unit != rules['unit']:
warnings.append(f"Unexpected unit '{unit}', expected '{rules['unit']}'")
return {
'valid': len(warnings) == 0,
'warnings': warnings
}
def detect_outliers(self, values: List[float], threshold: float = 3.0) -> List[int]:
"""Detect outliers using z-score method"""
if len(values) < 3:
return []
mean = np.mean(values)
std = np.std(values)
if std == 0:
return []
z_scores = [(x - mean) / std for x in values]
outliers = [i for i, z in enumerate(z_scores) if abs(z) > threshold]
return outliers
Real-time Processing
python
# stream_processor.py
from kafka import KafkaConsumer, KafkaProducer
import json
class StreamProcessor:
def __init__(self, kafka_brokers):
self.consumer = KafkaConsumer(
'sensor-data',
bootstrap_servers=kafka_brokers,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.producer = KafkaProducer(
bootstrap_servers=kafka_brokers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def process_stream(self):
window = []
window_size = 10
for message in self.consumer:
data = message.value
# Add to window
window.append(data['value'])
if len(window) > window_size:
window.pop(0)
# Calculate moving average
moving_avg = sum(window) / len(window)
# Detect anomalies
if abs(data['value'] - moving_avg) > 2 * np.std(window):
self.producer.send('anomalies', {
'device_id': data['device_id'],
'sensor_type': data['sensor_type'],
'value': data['value'],
'expected': moving_avg,
'timestamp': data['timestamp']
})
# Send processed data
self.producer.send('processed-data', {
**data,
'moving_avg': moving_avg
})
Anomaly Detection
python
# anomaly_detector.py
import numpy as np
from sklearn.ensemble import IsolationForest
class AnomalyDetector:
def __init__(self):
self.model = IsolationForest(contamination=0.1, random_state=42)
self.is_trained = False
def train(self, historical_data: List[float]):
"""Train on historical normal data"""
X = np.array(historical_data).reshape(-1, 1)
self.model.fit(X)
self.is_trained = True
def detect(self, value: float) -> bool:
"""Returns True if anomaly detected"""
if not self.is_trained:
return False
prediction = self.model.predict([[value]])
return prediction[0] == -1 # -1 indicates anomaly
def detect_batch(self, values: List[float]) -> List[bool]:
"""Detect anomalies in batch"""
if not self.is_trained:
return [False] * len(values)
X = np.array(values).reshape(-1, 1)
predictions = self.model.predict(X)
return [p == -1 for p in predictions]
# Statistical anomaly detection
class StatisticalAnomalyDetector:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.history = []
def add_value(self, value: float):
self.history.append(value)
if len(self.history) > self.window_size:
self.history.pop(0)
def is_anomaly(self, value: float, threshold: float = 3.0) -> bool:
"""Z-score based anomaly detection"""
if len(self.history) < 10:
return False
mean = np.mean(self.history)
std = np.std(self.history)
if std == 0:
return False
z_score = abs((value - mean) / std)
return z_score > threshold
Data Compression
python
# data_compression.py
class DataCompression:
@staticmethod
def delta_encoding(values: List[float]) -> List[float]:
"""Delta encoding for time-series compression"""
if not values:
return []
compressed = [values[0]]
for i in range(1, len(values)):
compressed.append(values[i] - values[i-1])
return compressed
@staticmethod
def delta_decoding(compressed: List[float]) -> List[float]:
"""Decode delta-encoded data"""
if not compressed:
return []
values = [compressed[0]]
for i in range(1, len(compressed)):
values.append(values[-1] + compressed[i])
return values
@staticmethod
def downsample(values: List[float], factor: int) -> List[float]:
"""Downsample by averaging"""
downsampled = []
for i in range(0, len(values), factor):
chunk = values[i:i+factor]
downsampled.append(sum(chunk) / len(chunk))
return downsampled
Best Practices
- Time-Series DB - Use TimescaleDB or InfluxDB
- Compression - Enable compression for old data
- Retention - Set appropriate retention policies
- Aggregation - Pre-aggregate data for performance
- Validation - Validate all incoming data
- Anomaly Detection - Implement real-time detection
- Batch Processing - Use for historical analysis
- Indexing - Index by device and time
- Partitioning - Partition by time
- Monitoring - Monitor ingestion rates
Quick Start
TimescaleDB Setup
sql
-- Create hypertable
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
sensor_type TEXT NOT NULL,
value DOUBLE PRECISION,
metadata JSONB
);
-- Convert to hypertable
SELECT create_hypertable('sensor_data', 'time');
-- Insert data
INSERT INTO sensor_data (time, device_id, sensor_type, value)
VALUES (NOW(), 'device-001', 'temperature', 25.5);
Stream Processing
python
import paho.mqtt.client as mqtt
from timescaledb import TimescaleDB
def on_message(client, userdata, message):
data = json.loads(message.payload)
# Process and store
db.insert_sensor_data(
device_id=data['device_id'],
sensor_type=data['type'],
value=data['value'],
timestamp=datetime.utcnow()
)
# Detect anomalies
if is_anomaly(data['value']):
send_alert(data)
Production Checklist
- Time-Series DB: TimescaleDB or similar
- Data Ingestion: Efficient data ingestion
- Stream Processing: Real-time stream processing
- Aggregation: Pre-aggregate data
- Retention: Set retention policies
- Validation: Validate incoming data
- Anomaly Detection: Real-time anomaly detection
- Batch Processing: Batch processing for analysis
- Indexing: Index by device and time
- Partitioning: Partition by time
- Monitoring: Monitor ingestion rates
- Documentation: Document data pipeline
Anti-patterns
❌ Don't: Regular Database
sql
-- ❌ Bad - Regular table
CREATE TABLE sensor_data (
id SERIAL PRIMARY KEY,
device_id TEXT,
value DOUBLE PRECISION,
timestamp TIMESTAMP
);
-- Not optimized for time-series!
sql
-- ✅ Good - Hypertable
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
value DOUBLE PRECISION
);
SELECT create_hypertable('sensor_data', 'time');
-- Optimized for time-series!
❌ Don't: No Retention
sql
-- ❌ Bad - Keep all data forever
-- Storage grows indefinitely!
sql
-- ✅ Good - Retention policy
SELECT add_retention_policy('sensor_data', INTERVAL '1 year');
-- Auto-delete old data
Integration Points
- Real-time Monitoring (
36-iot-integration/real-time-monitoring/) - Monitoring dashboards - Edge Computing (
36-iot-integration/edge-computing/) - Edge processing - Database Optimization (
04-database/database-optimization/) - Query optimization
Further Reading
Resources
Didn't find tool you were looking for?