Agent skill
Data Quality Checks
Comprehensive guide to data quality validation, testing frameworks, anomaly detection, and data observability for production data pipelines
Stars
163
Forks
31
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/data-quality-checks
SKILL.md
Data Quality Checks
What is Data Quality?
Data Quality: Ensuring data is accurate, complete, consistent, and reliable for business use.
Dimensions of Data Quality
Accuracy: Data is correct
Completeness: No missing values
Consistency: Same across systems
Timeliness: Data is fresh
Validity: Conforms to rules
Uniqueness: No duplicates
Why Data Quality Matters
- Bad decisions: Wrong data → wrong insights
- Broken pipelines: Invalid data breaks downstream
- Lost trust: Users stop trusting data
- Compliance: Regulations require data quality
Types of Data Quality Checks
Schema Validation
python
# Check column exists
assert "customer_id" in df.columns
# Check data type
assert df.schema["customer_id"].dataType == IntegerType()
# Check column count
assert len(df.columns) == 10
Completeness Checks
python
# No nulls in required columns
null_count = df.filter(col("customer_id").isNull()).count()
assert null_count == 0, f"Found {null_count} null customer_ids"
# Minimum row count
row_count = df.count()
assert row_count > 1000, f"Expected >1000 rows, got {row_count}"
Uniqueness Checks
python
# No duplicates
total_rows = df.count()
unique_rows = df.select("customer_id").distinct().count()
assert total_rows == unique_rows, f"Found {total_rows - unique_rows} duplicates"
Range Checks
python
# Values within expected range
invalid_ages = df.filter((col("age") < 0) | (col("age") > 120)).count()
assert invalid_ages == 0, f"Found {invalid_ages} invalid ages"
# Amount is positive
invalid_amounts = df.filter(col("amount") < 0).count()
assert invalid_amounts == 0, f"Found {invalid_amounts} negative amounts"
Referential Integrity
python
# Foreign key exists
orders_df = spark.read.table("orders")
customers_df = spark.read.table("customers")
orphan_orders = orders_df.join(
customers_df,
orders_df.customer_id == customers_df.customer_id,
"left_anti"
).count()
assert orphan_orders == 0, f"Found {orphan_orders} orders without customers"
Format Validation
python
# Email format
import re
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
invalid_emails = df.filter(
~col("email").rlike(email_pattern)
).count()
assert invalid_emails == 0, f"Found {invalid_emails} invalid emails"
# Date format
invalid_dates = df.filter(
~col("order_date").cast("date").isNotNull()
).count()
assert invalid_dates == 0, f"Found {invalid_dates} invalid dates"
Great Expectations
Setup
python
from great_expectations.dataset import SparkDFDataset
# Wrap DataFrame
ge_df = SparkDFDataset(df)
Common Expectations
python
# Column exists
ge_df.expect_column_to_exist("customer_id")
# No nulls
ge_df.expect_column_values_to_not_be_null("customer_id")
# Unique values
ge_df.expect_column_values_to_be_unique("customer_id")
# Values in set
ge_df.expect_column_values_to_be_in_set(
"status",
["pending", "completed", "cancelled"]
)
# Range
ge_df.expect_column_values_to_be_between(
"age",
min_value=0,
max_value=120
)
# Regex
ge_df.expect_column_values_to_match_regex(
"email",
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
)
# Row count
ge_df.expect_table_row_count_to_be_between(
min_value=1000,
max_value=1000000
)
Validation
python
# Run all expectations
results = ge_df.validate()
# Check if passed
if results["success"]:
print("All checks passed!")
else:
print("Some checks failed:")
for result in results["results"]:
if not result["success"]:
print(f" - {result['expectation_config']['expectation_type']}")
Data Docs
bash
# Generate HTML documentation
great_expectations docs build
dbt Tests
Schema Tests
yaml
# models/schema.yml
version: 2
models:
- name: customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- not_null
- unique
- name: status
tests:
- accepted_values:
values: ['active', 'inactive', 'suspended']
- name: created_at
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "<= current_date"
Custom Tests
sql
-- tests/assert_positive_revenue.sql
select *
from {{ ref('fct_orders') }}
where total_amount < 0
Relationships Test
yaml
models:
- name: orders
columns:
- name: customer_id
tests:
- relationships:
to: ref('customers')
field: customer_id
Anomaly Detection
Statistical Anomalies
python
from scipy import stats
import numpy as np
# Z-score (standard deviations from mean)
df_pd = df.toPandas()
z_scores = np.abs(stats.zscore(df_pd['amount']))
anomalies = df_pd[z_scores > 3] # >3 std devs
print(f"Found {len(anomalies)} anomalies")
Time Series Anomalies
python
# Detect sudden spikes/drops
daily_counts = df.groupBy("date").count().orderBy("date")
# Calculate moving average
from pyspark.sql.window import Window
window = Window.orderBy("date").rowsBetween(-7, 0)
daily_counts = daily_counts.withColumn(
"moving_avg",
avg("count").over(window)
)
# Flag anomalies (>2x moving average)
anomalies = daily_counts.filter(
col("count") > col("moving_avg") * 2
)
Null Rate Anomalies
python
# Track null rate over time
null_rates = df.groupBy("date").agg(
(sum(when(col("email").isNull(), 1).otherwise(0)) / count("*")).alias("null_rate")
)
# Alert if null rate > 5%
high_null_days = null_rates.filter(col("null_rate") > 0.05)
Data Observability
Freshness Checks
python
from datetime import datetime, timedelta
# Check data freshness
latest_timestamp = df.agg(max("created_at")).collect()[0][0]
now = datetime.now()
age = now - latest_timestamp
# Alert if data is >24 hours old
assert age < timedelta(hours=24), f"Data is {age} old (stale!)"
Volume Checks
python
# Check row count is within expected range
today_count = df.filter(col("date") == current_date()).count()
expected_min = 10000
expected_max = 100000
assert expected_min <= today_count <= expected_max, \
f"Row count {today_count} outside expected range [{expected_min}, {expected_max}]"
Distribution Checks
python
# Check value distribution
status_dist = df.groupBy("status").count().collect()
# Expected distribution
expected = {
"active": (0.7, 0.9), # 70-90%
"inactive": (0.05, 0.2), # 5-20%
"suspended": (0.0, 0.1) # 0-10%
}
total = df.count()
for row in status_dist:
status = row["status"]
count = row["count"]
pct = count / total
if status in expected:
min_pct, max_pct = expected[status]
assert min_pct <= pct <= max_pct, \
f"{status}: {pct:.1%} outside expected range [{min_pct:.1%}, {max_pct:.1%}]"
Data Quality Frameworks
Great Expectations
python
import great_expectations as ge
# Create context
context = ge.get_context()
# Create expectation suite
suite = context.create_expectation_suite("my_suite")
# Add expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="my_suite"
)
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_be_unique("customer_id")
# Save suite
validator.save_expectation_suite()
# Run validation
results = context.run_checkpoint(checkpoint_name="my_checkpoint")
dbt Tests
bash
# Run all tests
dbt test
# Run tests for specific model
dbt test --select customers
# Run tests in CI/CD
dbt test --fail-fast
Monte Carlo
python
# SaaS data observability platform
# Automatic anomaly detection
# No code required
Soda
yaml
# checks.yml
checks for customers:
- row_count > 1000
- missing_count(customer_id) = 0
- duplicate_count(customer_id) = 0
- invalid_count(email) = 0:
valid format: email
- freshness(created_at) < 24h
bash
# Run checks
soda scan -d my_datasource -c configuration.yml checks.yml
Implementing Data Quality Pipeline
Pipeline Structure
python
def run_quality_checks(df, checks):
"""Run data quality checks and return results"""
results = []
for check in checks:
try:
check(df)
results.append({
"check": check.__name__,
"status": "PASS",
"error": None
})
except AssertionError as e:
results.append({
"check": check.__name__,
"status": "FAIL",
"error": str(e)
})
return results
# Define checks
def check_no_nulls(df):
null_count = df.filter(col("customer_id").isNull()).count()
assert null_count == 0, f"Found {null_count} nulls"
def check_no_duplicates(df):
total = df.count()
unique = df.select("customer_id").distinct().count()
assert total == unique, f"Found {total - unique} duplicates"
def check_row_count(df):
count = df.count()
assert count > 1000, f"Expected >1000 rows, got {count}"
# Run checks
checks = [check_no_nulls, check_no_duplicates, check_row_count]
results = run_quality_checks(df, checks)
# Log results
for result in results:
if result["status"] == "FAIL":
print(f"❌ {result['check']}: {result['error']}")
else:
print(f"✅ {result['check']}")
# Fail pipeline if any check failed
if any(r["status"] == "FAIL" for r in results):
raise Exception("Data quality checks failed")
Quarantine Bad Data
python
# Separate good and bad data
good_data = df.filter(
col("customer_id").isNotNull() &
col("email").rlike(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
)
bad_data = df.subtract(good_data)
# Write good data to production
good_data.write.format("delta").mode("append").save("/prod/customers")
# Write bad data to quarantine
bad_data.write.format("delta").mode("append").save("/quarantine/customers")
# Alert on bad data
if bad_data.count() > 0:
send_alert(f"Found {bad_data.count()} bad records in quarantine")
Monitoring and Alerting
Metrics to Track
Freshness: How old is the data?
Volume: Row count per day
Completeness: % of non-null values
Uniqueness: % of unique values
Validity: % of values passing validation
Schema changes: Columns added/removed/changed
Alerting Rules
python
# Alert if data is stale
if data_age > timedelta(hours=24):
send_alert("Data is stale")
# Alert if volume drops >50%
if today_count < yesterday_count * 0.5:
send_alert("Volume dropped significantly")
# Alert if null rate increases >10%
if today_null_rate > yesterday_null_rate + 0.1:
send_alert("Null rate increased")
Dashboard
Metrics Dashboard:
- Data freshness (last updated)
- Row count trend (7 days)
- Null rate trend (7 days)
- Test pass rate (% passing)
- Failed tests (list)
Best Practices
1. Test Early and Often
Bronze layer: Schema validation
Silver layer: Completeness, uniqueness
Gold layer: Business logic validation
2. Fail Fast
python
# Stop pipeline if critical checks fail
if critical_check_failed:
raise Exception("Critical check failed, stopping pipeline")
3. Quarantine Bad Data
Don't drop bad data
→ Quarantine for investigation
→ Fix and reprocess
4. Monitor Trends
Track metrics over time
Alert on anomalies
Investigate root cause
5. Document Expectations
yaml
# Document what "good" data looks like
customers:
- customer_id: unique, not null
- email: valid format, unique
- age: 0-120
- status: active, inactive, suspended
Summary
Data Quality: Ensuring data is accurate, complete, consistent
Dimensions:
- Accuracy, Completeness, Consistency
- Timeliness, Validity, Uniqueness
Check Types:
- Schema validation
- Completeness checks
- Uniqueness checks
- Range checks
- Referential integrity
- Format validation
Frameworks:
- Great Expectations (Python)
- dbt tests (SQL)
- Soda (YAML)
- Monte Carlo (SaaS)
Anomaly Detection:
- Statistical (Z-score)
- Time series (moving average)
- Null rate tracking
Data Observability:
- Freshness checks
- Volume checks
- Distribution checks
Best Practices:
- Test early and often
- Fail fast on critical checks
- Quarantine bad data
- Monitor trends
- Document expectations
Didn't find tool you were looking for?