Agent skill

Data Freshness and Latency

Monitoring and optimizing how quickly data flows through pipelines and ensuring it meets timeliness requirements.

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/freshness-latency

SKILL.md

Data Freshness and Latency

Overview

Data Freshness measures how current the data is (age), while Latency measures how long it takes for data to flow through the pipeline (processing time). Both are critical for real-time analytics, operational dashboards, and time-sensitive decision-making.

Core Principle: "Stale data leads to stale decisions. Monitor freshness, optimize latency."


1. Freshness vs. Latency

Metric Definition Example Measurement
Freshness How old is the data? Data is 5 minutes old NOW() - MAX(event_timestamp)
Latency How long does processing take? Pipeline takes 2 minutes processing_end_time - event_timestamp

Example

Event occurs: 10:00:00
Event arrives in pipeline: 10:00:05 (5 sec ingestion latency)
Processing completes: 10:02:00 (2 min processing latency)
Data queried by user: 10:05:00

Freshness at query time: 5 minutes (10:05 - 10:00)
Total latency: 2 minutes 5 seconds

2. Freshness Requirements by Use Case

Use Case Freshness SLO Acceptable Latency Example
Real-time fraud detection < 1 second < 100ms Credit card transaction scoring
Live dashboards < 1 minute < 10 seconds Website analytics
Operational metrics < 5 minutes < 1 minute Server health monitoring
Business intelligence < 1 hour < 15 minutes Sales reports
Data warehouse < 24 hours < 4 hours Historical analysis
Compliance reporting < 7 days Days Annual audits

3. Measuring Data Freshness

SQL Freshness Check

sql
-- Check freshness of latest record
SELECT 
    MAX(created_at) as latest_record,
    NOW() as current_time,
    EXTRACT(EPOCH FROM (NOW() - MAX(created_at))) / 60 as age_minutes
FROM events;

-- Alert if data is stale (> 10 minutes old)
SELECT 
    CASE 
        WHEN MAX(created_at) < NOW() - INTERVAL '10 minutes' 
        THEN 'STALE'
        ELSE 'FRESH'
    END as freshness_status
FROM events;

Python Freshness Monitoring

python
from datetime import datetime, timedelta
import pandas as pd

def check_freshness(df: pd.DataFrame, timestamp_col: str, max_age_minutes: int = 10):
    """Check if data is fresh enough"""
    latest_timestamp = df[timestamp_col].max()
    age = datetime.now() - latest_timestamp
    age_minutes = age.total_seconds() / 60
    
    is_fresh = age_minutes <= max_age_minutes
    
    return {
        'is_fresh': is_fresh,
        'latest_timestamp': latest_timestamp,
        'age_minutes': age_minutes,
        'threshold_minutes': max_age_minutes
    }

# Usage
result = check_freshness(df, 'event_time', max_age_minutes=10)
if not result['is_fresh']:
    alert(f"Data is stale: {result['age_minutes']} minutes old")

dbt Freshness Tests

yaml
# models/sources.yml
version: 2

sources:
  - name: production
    database: analytics
    freshness:
      warn_after: {count: 12, period: hour}
      error_after: {count: 24, period: hour}
    
    tables:
      - name: events
        loaded_at_field: created_at
        freshness:
          warn_after: {count: 10, period: minute}
          error_after: {count: 30, period: minute}

4. Latency Measurement

End-to-End Pipeline Latency

python
def measure_pipeline_latency(event_id: str):
    """Measure latency from event to availability"""
    
    # Get event timestamp from source
    event_time = get_event_timestamp(event_id)
    
    # Get processing completion time
    processed_time = get_processed_timestamp(event_id)
    
    # Calculate latency
    latency = (processed_time - event_time).total_seconds()
    
    # Track percentiles
    latency_metrics.observe(latency)
    
    return {
        'event_id': event_id,
        'event_time': event_time,
        'processed_time': processed_time,
        'latency_seconds': latency
    }

Per-Stage Latency Tracking

python
class PipelineStage:
    def __init__(self, name: str):
        self.name = name
        self.start_time = None
        self.end_time = None
    
    def __enter__(self):
        self.start_time = datetime.now()
        return self
    
    def __exit__(self, *args):
        self.end_time = datetime.now()
        latency = (self.end_time - self.start_time).total_seconds()
        
        # Log to monitoring
        log_metric(f'pipeline.{self.name}.latency', latency)

# Usage
with PipelineStage('ingestion'):
    ingest_data()

with PipelineStage('transformation'):
    transform_data()

with PipelineStage('loading'):
    load_data()

Prometheus Metrics

python
from prometheus_client import Histogram

# Define latency histogram
pipeline_latency = Histogram(
    'pipeline_latency_seconds',
    'Time taken for data to flow through pipeline',
    ['stage', 'source'],
    buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 300]  # seconds
)

# Record latency
with pipeline_latency.labels(stage='transform', source='kafka').time():
    transform_data()

5. Freshness Monitoring and Alerting

Automated Freshness Checks

python
import schedule
import time

def monitor_freshness():
    """Continuously monitor data freshness"""
    tables = ['events', 'users', 'orders']
    
    for table in tables:
        freshness = check_table_freshness(table)
        
        if not freshness['is_fresh']:
            alert(
                severity='warning',
                message=f"Table {table} is stale: {freshness['age_minutes']} minutes old",
                threshold=freshness['threshold_minutes']
            )

# Run every 5 minutes
schedule.every(5).minutes.do(monitor_freshness)

while True:
    schedule.run_pending()
    time.sleep(60)

Watermark Tracking

python
class WatermarkTracker:
    """Track high-water mark for streaming data"""
    
    def __init__(self, table_name: str):
        self.table_name = table_name
        self.watermark = self.load_watermark()
    
    def load_watermark(self) -> datetime:
        """Load last processed timestamp"""
        result = db.execute(
            f"SELECT MAX(processed_at) FROM {self.table_name}_watermark"
        ).fetchone()
        return result[0] if result[0] else datetime.min
    
    def update_watermark(self, timestamp: datetime):
        """Update watermark after processing"""
        db.execute(
            f"INSERT INTO {self.table_name}_watermark (processed_at) VALUES (%s)",
            (timestamp,)
        )
        self.watermark = timestamp
    
    def get_lag(self) -> timedelta:
        """Get lag between watermark and current time"""
        return datetime.now() - self.watermark
    
    def is_lagging(self, threshold_minutes: int = 10) -> bool:
        """Check if processing is lagging"""
        lag_minutes = self.get_lag().total_seconds() / 60
        return lag_minutes > threshold_minutes

6. Improving Freshness

Change Data Capture (CDC)

python
# Debezium CDC example
# Instead of batch ETL every hour, stream changes in real-time

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'dbserver1.inventory.customers',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    change_event = message.value
    
    if change_event['op'] == 'c':  # Create
        insert_to_warehouse(change_event['after'])
    elif change_event['op'] == 'u':  # Update
        update_warehouse(change_event['after'])
    elif change_event['op'] == 'd':  # Delete
        delete_from_warehouse(change_event['before'])

Incremental Updates

sql
-- Instead of full table refresh
-- DELETE FROM target_table;
-- INSERT INTO target_table SELECT * FROM source_table;

-- Use incremental update
INSERT INTO target_table
SELECT * FROM source_table
WHERE updated_at > (SELECT MAX(updated_at) FROM target_table)
ON CONFLICT (id) DO UPDATE SET
    column1 = EXCLUDED.column1,
    updated_at = EXCLUDED.updated_at;

Parallel Processing

python
from concurrent.futures import ThreadPoolExecutor
import pandas as pd

def process_partition(partition_df: pd.DataFrame):
    """Process a partition of data"""
    # Transform and load
    transformed = transform(partition_df)
    load_to_warehouse(transformed)

# Split data into partitions
partitions = np.array_split(large_df, 10)

# Process in parallel
with ThreadPoolExecutor(max_workers=10) as executor:
    executor.map(process_partition, partitions)

7. Trade-offs

Freshness vs. Cost

Real-time streaming (< 1 min freshness):
- Cost: $$$$ (Kafka, Flink, dedicated infrastructure)
- Use when: Fraud detection, live dashboards

Micro-batch (5-15 min freshness):
- Cost: $$ (Spark Streaming, scheduled jobs)
- Use when: Operational metrics, near-real-time analytics

Batch (hourly/daily freshness):
- Cost: $ (Airflow, cron jobs)
- Use when: Reporting, historical analysis

Freshness vs. Completeness

python
# Trade-off: Wait for all data vs. process what we have

def process_with_timeout(timeout_seconds: int = 300):
    """Process data with timeout to ensure freshness"""
    start_time = time.time()
    data_buffer = []
    
    while time.time() - start_time < timeout_seconds:
        new_data = fetch_data()
        data_buffer.extend(new_data)
        
        if is_complete(data_buffer):
            break  # Got all data
    
    # Process what we have, even if incomplete
    if len(data_buffer) > 0:
        process(data_buffer)
    else:
        alert("No data received within timeout")

8. Freshness SLAs and SLOs

Defining SLOs

yaml
# data_freshness_slos.yml
services:
  - name: user_events
    freshness_slo:
      target: 95  # 95% of data should be fresh
      threshold: 5  # within 5 minutes
      measurement_window: 1h
    
  - name: order_analytics
    freshness_slo:
      target: 99
      threshold: 15  # within 15 minutes
      measurement_window: 24h

Measuring SLO Compliance

python
def calculate_freshness_slo(table_name: str, threshold_minutes: int, window_hours: int = 1):
    """Calculate % of data meeting freshness SLO"""
    
    query = f"""
    SELECT 
        COUNT(*) FILTER (
            WHERE created_at > NOW() - INTERVAL '{threshold_minutes} minutes'
        )::FLOAT / COUNT(*) * 100 as freshness_percent
    FROM {table_name}
    WHERE created_at > NOW() - INTERVAL '{window_hours} hours'
    """
    
    result = db.execute(query).fetchone()
    freshness_percent = result[0]
    
    return {
        'table': table_name,
        'freshness_percent': freshness_percent,
        'threshold_minutes': threshold_minutes,
        'meets_slo': freshness_percent >= 95  # 95% target
    }

9. Tools for Freshness Monitoring

Monte Carlo Freshness Checks

yaml
# Monte Carlo automatically monitors freshness
monitors:
  - type: freshness
    table: production.events
    field: created_at
    threshold: 10 minutes
    alert:
      - slack: #data-alerts
      - pagerduty: data-team

Custom Grafana Dashboard

promql
# Prometheus query for freshness
time() - max(event_timestamp) by (table)

# Alert rule
ALERT DataStale
IF (time() - max(event_timestamp)) > 600  # 10 minutes
FOR 5m
LABELS { severity="warning" }
ANNOTATIONS {
  summary="Data is stale in {{ $labels.table }}",
  description="Latest data is {{ $value }}s old"
}

10. Handling Stale Data

Fallback to Cached Data

python
def get_data_with_fallback(cache_ttl_minutes: int = 60):
    """Get fresh data or fall back to cache"""
    
    # Try to get fresh data
    fresh_data = fetch_from_warehouse()
    freshness = check_freshness(fresh_data, 'updated_at', max_age_minutes=10)
    
    if freshness['is_fresh']:
        # Update cache
        cache.set('latest_data', fresh_data, ttl=cache_ttl_minutes * 60)
        return fresh_data
    else:
        # Fall back to cache
        cached_data = cache.get('latest_data')
        if cached_data:
            logger.warning(f"Using cached data (warehouse data is stale)")
            return cached_data
        else:
            raise DataUnavailableError("No fresh or cached data available")

Display Staleness to Users

python
def get_dashboard_data():
    """Get data with freshness indicator"""
    data = fetch_data()
    freshness = check_freshness(data, 'event_time')
    
    return {
        'data': data,
        'metadata': {
            'last_updated': freshness['latest_timestamp'],
            'age_minutes': freshness['age_minutes'],
            'is_fresh': freshness['is_fresh'],
            'warning': f"Data is {freshness['age_minutes']:.0f} minutes old" if not freshness['is_fresh'] else None
        }
    }

11. Real Freshness Issues

Case Study: The Stale Dashboard

  • Problem: Executive dashboard showing yesterday's revenue
  • Root Cause: ETL job failed at 2 AM, no alerting on freshness
  • Impact: Wrong business decisions made based on stale data
  • Solution: Added freshness monitoring with PagerDuty alerts
  • Prevention: Implemented SLO tracking and automated freshness tests

Case Study: The Slow Pipeline

  • Problem: Real-time fraud detection taking 5 minutes (SLO: < 1 second)
  • Root Cause: Single-threaded processing, no partitioning
  • Solution: Implemented Kafka partitioning and parallel consumers
  • Result: Latency reduced from 5 minutes to 200ms

12. Data Freshness Checklist

  • SLOs Defined: Do we have freshness SLOs for each critical table?
  • Monitoring: Are we continuously monitoring freshness?
  • Alerting: Do we get alerted when data goes stale?
  • Latency Tracking: Are we measuring P50/P95/P99 latencies?
  • Optimization: Have we optimized for our freshness requirements?
  • Fallbacks: Do we have fallback strategies for stale data?
  • User Communication: Do we show data age to end users?
  • SLO Compliance: Are we meeting our freshness SLOs > 95% of the time?

Related Skills

  • 43-data-reliability/data-quality-monitoring
  • 43-data-reliability/data-contracts
  • 42-cost-engineering/infra-sizing

Didn't find tool you were looking for?

Be as detailed as possible for better results