Agent skill
senior-data-engineer
Data engineering skill for building scalable data pipelines, ETL/ELT systems, and data infrastructure. Expertise in Python, SQL, Spark, Airflow, dbt, Kafka, and modern data stack. Includes data modeling, pipeline orchestration, data quality, and DataOps. Use when designing data architectures, building data pipelines, optimizing data workflows, implementing data governance, or troubleshooting data issues.
Install this agent skill to your Project
npx add-skill https://github.com/borghei/Claude-Skills/tree/main/engineering/senior-data-engineer
Metadata
Additional technical details for this skill
- tags
-
airflow spark data-pipelines warehousing etl
- author
- borghei
- domain
- data-engineering
- updated
- 1774915200
- version
- 1.0.0
- category
- engineering
SKILL.md
Senior Data Engineer
Production-grade data engineering skill for building scalable, reliable data systems.
Table of Contents
- Trigger Phrases
- Quick Start
- Workflows
- Building a Batch ETL Pipeline
- Implementing Real-Time Streaming
- Data Quality Framework Setup
- Architecture Decision Framework
- Tech Stack
- Reference Documentation
- Troubleshooting
Trigger Phrases
Activate this skill when you see:
Pipeline Design:
- "Design a data pipeline for..."
- "Build an ETL/ELT process..."
- "How should I ingest data from..."
- "Set up data extraction from..."
Architecture:
- "Should I use batch or streaming?"
- "Lambda vs Kappa architecture"
- "How to handle late-arriving data"
- "Design a data lakehouse"
Data Modeling:
- "Create a dimensional model..."
- "Star schema vs snowflake"
- "Implement slowly changing dimensions"
- "Design a data vault"
Data Quality:
- "Add data validation to..."
- "Set up data quality checks"
- "Monitor data freshness"
- "Implement data contracts"
Performance:
- "Optimize this Spark job"
- "Query is running slow"
- "Reduce pipeline execution time"
- "Tune Airflow DAG"
Quick Start
Core Tools
# Generate pipeline orchestration config
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--destination snowflake \
--schedule "0 5 * * *"
# Validate data quality
python scripts/data_quality_validator.py validate \
--input data/sales.parquet \
--schema schemas/sales.json \
--checks freshness,completeness,uniqueness
# Optimize ETL performance
python scripts/etl_performance_optimizer.py analyze \
--query queries/daily_aggregation.sql \
--engine spark \
--recommend
Workflows
Workflow 1: Building a Batch ETL Pipeline
Scenario: Extract data from PostgreSQL, transform with dbt, load to Snowflake.
Step 1: Define Source Schema
-- Document source tables
SELECT
table_name,
column_name,
data_type,
is_nullable
FROM information_schema.columns
WHERE table_schema = 'source_schema'
ORDER BY table_name, ordinal_position;
Step 2: Generate Extraction Config
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--tables orders,customers,products \
--mode incremental \
--watermark updated_at \
--output dags/extract_source.py
Step 3: Create dbt Models
-- models/staging/stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('postgres', 'orders') }}
),
renamed AS (
SELECT
order_id,
customer_id,
order_date,
total_amount,
status,
_extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)
)
SELECT * FROM renamed
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
cluster_by=['order_date']
)
}}
SELECT
o.order_id,
o.customer_id,
c.customer_segment,
o.order_date,
o.total_amount,
o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}
Step 4: Configure Data Quality Tests
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: "Order fact table"
columns:
- name: order_id
tests:
- unique
- not_null
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
- name: order_date
tests:
- not_null
- dbt_utils.recency:
datepart: day
field: order_date
interval: 1
Step 5: Create Airflow DAG
# dags/daily_etl.py
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data-alerts@company.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='Daily ETL from PostgreSQL to Snowflake',
schedule_interval='0 5 * * *',
start_date=days_ago(1),
catchup=False,
tags=['etl', 'daily'],
) as dag:
extract = BashOperator(
task_id='extract_source_data',
bash_command='python /opt/airflow/scripts/extract.py --date {{ ds }}',
)
transform = BashOperator(
task_id='run_dbt_models',
bash_command='cd /opt/airflow/dbt && dbt run --select marts.*',
)
test = BashOperator(
task_id='run_dbt_tests',
bash_command='cd /opt/airflow/dbt && dbt test --select marts.*',
)
notify = BashOperator(
task_id='send_notification',
bash_command='python /opt/airflow/scripts/notify.py --status success',
trigger_rule='all_success',
)
extract >> transform >> test >> notify
Step 6: Validate Pipeline
# Test locally
dbt run --select stg_orders fct_orders
dbt test --select fct_orders
# Validate data quality
python scripts/data_quality_validator.py validate \
--table fct_orders \
--checks all \
--output reports/quality_report.json
Workflow 2: Implementing Real-Time Streaming
Scenario: Stream events from Kafka, process with Flink/Spark Streaming, sink to data lake.
Step 1: Define Event Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "UserEvent",
"type": "object",
"required": ["event_id", "user_id", "event_type", "timestamp"],
"properties": {
"event_id": {"type": "string", "format": "uuid"},
"user_id": {"type": "string"},
"event_type": {"type": "string", "enum": ["page_view", "click", "purchase"]},
"timestamp": {"type": "string", "format": "date-time"},
"properties": {"type": "object"}
}
}
Step 2: Create Kafka Topic
# Create topic with appropriate partitions
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=delete
# Verify topic
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic user-events
Step 3: Implement Spark Streaming Job
# streaming/user_events_processor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, window, count, avg,
to_timestamp, current_timestamp
)
from pyspark.sql.types import (
StructType, StructField, StringType,
TimestampType, MapType
)
# Initialize Spark
spark = SparkSession.builder \
.appName("UserEventsProcessor") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoints/user-events") \
.config("spark.sql.shuffle.partitions", "12") \
.getOrCreate()
# Define schema
event_schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", StringType(), False),
StructField("properties", MapType(StringType(), StringType()), True)
])
# Read from Kafka
events_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
# Parse JSON
parsed_df = events_df \
.select(from_json(col("value").cast("string"), event_schema).alias("data")) \
.select("data.*") \
.withColumn("event_timestamp", to_timestamp(col("timestamp")))
# Windowed aggregation
aggregated_df = parsed_df \
.withWatermark("event_timestamp", "10 minutes") \
.groupBy(
window(col("event_timestamp"), "5 minutes"),
col("event_type")
) \
.agg(
count("*").alias("event_count"),
approx_count_distinct("user_id").alias("unique_users")
)
# Write to Delta Lake
query = aggregated_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/user-events-aggregated") \
.option("path", "/data/lake/user_events_aggregated") \
.trigger(processingTime="1 minute") \
.start()
query.awaitTermination()
Step 4: Handle Late Data and Errors
# Dead letter queue for failed records
from pyspark.sql.functions import current_timestamp, lit
def process_with_error_handling(batch_df, batch_id):
try:
# Attempt processing
valid_df = batch_df.filter(col("event_id").isNotNull())
invalid_df = batch_df.filter(col("event_id").isNull())
# Write valid records
valid_df.write \
.format("delta") \
.mode("append") \
.save("/data/lake/user_events")
# Write invalid to DLQ
if invalid_df.count() > 0:
invalid_df \
.withColumn("error_timestamp", current_timestamp()) \
.withColumn("error_reason", lit("missing_event_id")) \
.write \
.format("delta") \
.mode("append") \
.save("/data/lake/dlq/user_events")
except Exception as e:
# Log error, alert, continue
logger.error(f"Batch {batch_id} failed: {e}")
raise
# Use foreachBatch for custom processing
query = parsed_df.writeStream \
.foreachBatch(process_with_error_handling) \
.option("checkpointLocation", "/checkpoints/user-events") \
.start()
Step 5: Monitor Stream Health
# monitoring/stream_metrics.py
from prometheus_client import Gauge, Counter, start_http_server
# Define metrics
RECORDS_PROCESSED = Counter(
'stream_records_processed_total',
'Total records processed',
['stream_name', 'status']
)
PROCESSING_LAG = Gauge(
'stream_processing_lag_seconds',
'Current processing lag',
['stream_name']
)
BATCH_DURATION = Gauge(
'stream_batch_duration_seconds',
'Last batch processing duration',
['stream_name']
)
def emit_metrics(query):
"""Emit Prometheus metrics from streaming query."""
progress = query.lastProgress
if progress:
RECORDS_PROCESSED.labels(
stream_name='user-events',
status='success'
).inc(progress['numInputRows'])
if progress['sources']:
# Calculate lag from latest offset
for source in progress['sources']:
end_offset = source.get('endOffset', {})
# Parse Kafka offsets and calculate lag
Workflow 3: Data Quality Framework Setup
Scenario: Implement comprehensive data quality monitoring with Great Expectations.
Step 1: Initialize Great Expectations
# Install and initialize
pip install great_expectations
great_expectations init
# Connect to data source
great_expectations datasource new
Step 2: Create Expectation Suite
# expectations/orders_suite.py
import great_expectations as gx
context = gx.get_context()
# Create expectation suite
suite = context.add_expectation_suite("orders_quality_suite")
# Add expectations
validator = context.get_validator(
batch_request={
"datasource_name": "warehouse",
"data_asset_name": "orders",
},
expectation_suite_name="orders_quality_suite"
)
# Schema expectations
validator.expect_table_columns_to_match_ordered_list(
column_list=[
"order_id", "customer_id", "order_date",
"total_amount", "status", "created_at"
]
)
# Completeness expectations
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
validator.expect_column_values_to_not_be_null("order_date")
# Uniqueness expectations
validator.expect_column_values_to_be_unique("order_id")
# Range expectations
validator.expect_column_values_to_be_between(
"total_amount",
min_value=0,
max_value=1000000
)
# Categorical expectations
validator.expect_column_values_to_be_in_set(
"status",
["pending", "confirmed", "shipped", "delivered", "cancelled"]
)
# Freshness expectation
validator.expect_column_max_to_be_between(
"order_date",
min_value={"$PARAMETER": "now - timedelta(days=1)"},
max_value={"$PARAMETER": "now"}
)
# Referential integrity
validator.expect_column_values_to_be_in_set(
"customer_id",
value_set={"$PARAMETER": "valid_customer_ids"}
)
validator.save_expectation_suite(discard_failed_expectations=False)
Step 3: Create Data Quality Checks with dbt
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: "Order fact table with data quality checks"
tests:
# Row count check
- dbt_utils.equal_rowcount:
compare_model: ref('stg_orders')
# Freshness check
- dbt_utils.recency:
datepart: hour
field: created_at
interval: 24
columns:
- name: order_id
description: "Unique order identifier"
tests:
- unique
- not_null
- relationships:
to: ref('dim_orders')
field: order_id
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
inclusive: true
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
row_condition: "status != 'cancelled'"
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
severity: warn
Step 4: Implement Data Contracts
# contracts/orders_contract.yaml
contract:
name: orders_data_contract
version: "1.0.0"
owner: data-team@company.com
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: "Unique order identifier"
customer_id:
type: string
not_null: true
order_date:
type: date
not_null: true
total_amount:
type: decimal
precision: 10
scale: 2
minimum: 0
status:
type: string
enum: ["pending", "confirmed", "shipped", "delivered", "cancelled"]
sla:
freshness:
max_delay_hours: 1
completeness:
min_percentage: 99.9
accuracy:
duplicate_tolerance: 0.01
consumers:
- name: analytics-team
usage: "Daily reporting dashboards"
- name: ml-team
usage: "Churn prediction model"
Step 5: Set Up Quality Monitoring Dashboard
# monitoring/quality_dashboard.py
from datetime import datetime, timedelta
import pandas as pd
def generate_quality_report(connection, table_name: str) -> dict:
"""Generate comprehensive data quality report."""
report = {
"table": table_name,
"timestamp": datetime.now().isoformat(),
"checks": {}
}
# Row count check
row_count = connection.execute(
f"SELECT COUNT(*) FROM {table_name}"
).fetchone()[0]
report["checks"]["row_count"] = {
"value": row_count,
"status": "pass" if row_count > 0 else "fail"
}
# Freshness check
max_date = connection.execute(
f"SELECT MAX(created_at) FROM {table_name}"
).fetchone()[0]
hours_old = (datetime.now() - max_date).total_seconds() / 3600
report["checks"]["freshness"] = {
"max_timestamp": max_date.isoformat(),
"hours_old": round(hours_old, 2),
"status": "pass" if hours_old < 24 else "fail"
}
# Null rate check
null_query = f"""
SELECT
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_order_id,
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_id,
COUNT(*) as total
FROM {table_name}
"""
null_result = connection.execute(null_query).fetchone()
report["checks"]["null_rates"] = {
"order_id": null_result[0] / null_result[2] if null_result[2] > 0 else 0,
"customer_id": null_result[1] / null_result[2] if null_result[2] > 0 else 0,
"status": "pass" if null_result[0] == 0 and null_result[1] == 0 else "fail"
}
# Duplicate check
dup_query = f"""
SELECT COUNT(*) - COUNT(DISTINCT order_id) as duplicates
FROM {table_name}
"""
duplicates = connection.execute(dup_query).fetchone()[0]
report["checks"]["duplicates"] = {
"count": duplicates,
"status": "pass" if duplicates == 0 else "fail"
}
# Overall status
all_passed = all(
check["status"] == "pass"
for check in report["checks"].values()
)
report["overall_status"] = "pass" if all_passed else "fail"
return report
Architecture Decision Framework
Use this framework to choose the right approach for your data pipeline.
Batch vs Streaming
| Criteria | Batch | Streaming |
|---|---|---|
| Latency requirement | Hours to days | Seconds to minutes |
| Data volume | Large historical datasets | Continuous event streams |
| Processing complexity | Complex transformations, ML | Simple aggregations, filtering |
| Cost sensitivity | More cost-effective | Higher infrastructure cost |
| Error handling | Easier to reprocess | Requires careful design |
Decision Tree:
Is real-time insight required?
├── Yes → Use streaming
│ └── Is exactly-once semantics needed?
│ ├── Yes → Kafka + Flink/Spark Structured Streaming
│ └── No → Kafka + consumer groups
└── No → Use batch
└── Is data volume > 1TB daily?
├── Yes → Spark/Databricks
└── No → dbt + warehouse compute
Lambda vs Kappa Architecture
| Aspect | Lambda | Kappa |
|---|---|---|
| Complexity | Two codebases (batch + stream) | Single codebase |
| Maintenance | Higher (sync batch/stream logic) | Lower |
| Reprocessing | Native batch layer | Replay from source |
| Use case | ML training + real-time serving | Pure event-driven |
When to choose Lambda:
- Need to train ML models on historical data
- Complex batch transformations not feasible in streaming
- Existing batch infrastructure
When to choose Kappa:
- Event-sourced architecture
- All processing can be expressed as stream operations
- Starting fresh without legacy systems
Data Warehouse vs Data Lakehouse
| Feature | Warehouse (Snowflake/BigQuery) | Lakehouse (Delta/Iceberg) |
|---|---|---|
| Best for | BI, SQL analytics | ML, unstructured data |
| Storage cost | Higher (proprietary format) | Lower (open formats) |
| Flexibility | Schema-on-write | Schema-on-read |
| Performance | Excellent for SQL | Good, improving |
| Ecosystem | Mature BI tools | Growing ML tooling |
Tech Stack
| Category | Technologies |
|---|---|
| Languages | Python, SQL, Scala |
| Orchestration | Airflow, Prefect, Dagster |
| Transformation | dbt, Spark, Flink |
| Streaming | Kafka, Kinesis, Pub/Sub |
| Storage | S3, GCS, Delta Lake, Iceberg |
| Warehouses | Snowflake, BigQuery, Redshift, Databricks |
| Quality | Great Expectations, dbt tests, Monte Carlo |
| Monitoring | Prometheus, Grafana, Datadog |
Reference Documentation
1. Data Pipeline Architecture
See references/data_pipeline_architecture.md for:
- Lambda vs Kappa architecture patterns
- Batch processing with Spark and Airflow
- Stream processing with Kafka and Flink
- Exactly-once semantics implementation
- Error handling and dead letter queues
2. Data Modeling Patterns
See references/data_modeling_patterns.md for:
- Dimensional modeling (Star/Snowflake)
- Slowly Changing Dimensions (SCD Types 1-6)
- Data Vault modeling
- dbt best practices
- Partitioning and clustering
3. DataOps Best Practices
See references/dataops_best_practices.md for:
- Data testing frameworks
- Data contracts and schema validation
- CI/CD for data pipelines
- Observability and lineage
- Incident response
Troubleshooting
Pipeline Failures
Symptom: Airflow DAG fails with timeout
Task exceeded max execution time
Solution:
- Check resource allocation
- Profile slow operations
- Add incremental processing
# Increase timeout
default_args = {
'execution_timeout': timedelta(hours=2),
}
# Or use incremental loads
WHERE updated_at > '{{ prev_ds }}'
Symptom: Spark job OOM
java.lang.OutOfMemoryError: Java heap space
Solution:
- Increase executor memory
- Reduce partition size
- Use disk spill
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.memory.fraction", "0.8")
Symptom: Kafka consumer lag increasing
Consumer lag: 1000000 messages
Solution:
- Increase consumer parallelism
- Optimize processing logic
- Scale consumer group
# Add more partitions
kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic user-events \
--partitions 24
Data Quality Issues
Symptom: Duplicate records appearing
Expected unique, found 150 duplicates
Solution:
- Add deduplication logic
- Use merge/upsert operations
-- dbt incremental with dedup
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
SELECT * FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY updated_at DESC
) as rn
FROM {{ source('raw', 'orders') }}
) WHERE rn = 1
Symptom: Stale data in tables
Last update: 3 days ago
Solution:
- Check upstream pipeline status
- Verify source availability
- Add freshness monitoring
# dbt freshness check
sources:
- name: raw
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: _loaded_at
Symptom: Schema drift detected
Column 'new_field' not in expected schema
Solution:
- Update data contract
- Modify transformations
- Communicate with producers
# Handle schema evolution
df = spark.read.format("delta") \
.option("mergeSchema", "true") \
.load("/data/orders")
Performance Issues
Symptom: Query takes hours
Query runtime: 4 hours (expected: 30 minutes)
Solution:
- Check query plan
- Add proper partitioning
- Optimize joins
-- Before: Full table scan
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- After: Partition pruning
-- Table partitioned by order_date
SELECT * FROM orders WHERE order_date = '2024-01-15';
-- Add clustering for frequent filters
ALTER TABLE orders CLUSTER BY (customer_id);
Symptom: dbt model takes too long
Model fct_orders completed in 45 minutes
Solution:
- Use incremental materialization
- Reduce upstream dependencies
- Pre-aggregate where possible
-- Convert to incremental
{{
config(
materialized='incremental',
unique_key='order_id',
on_schema_change='sync_all_columns'
)
}}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}
Troubleshooting
| Problem | Cause | Solution |
|---|---|---|
| Pipeline silently produces zero rows | Incremental watermark column has timezone mismatch between source and warehouse | Normalize all timestamps to UTC at extraction; add a row-count assertion (dbt_utils.equal_rowcount) after every incremental load |
| Spark shuffle stage takes 10x longer than expected | Heavy data skew on the join key (a few keys hold most rows) | Salt the skewed key (CONCAT(key, '_', MOD(RAND()*10,10))) or use broadcast join for the smaller table |
| Airflow scheduler marks DAG as "no tasks to run" | Circular or broken dependency reference in the DAG definition | Run airflow dags list-import-errors and fix the import; use pipeline_orchestrator.py validate --dag <file> --type airflow |
| dbt run succeeds but downstream dashboards show stale data | Source freshness not checked before transformation begins | Add dbt source freshness as a prerequisite task in the DAG; define freshness.warn_after and error_after in sources.yml |
| Kafka consumer lag grows unbounded | Consumer throughput is lower than producer rate; partition count too low | Increase partitions, scale consumer group, and batch max.poll.records; monitor with data_quality_validator.py profile on output tables |
| Data quality validator reports false-positive anomalies | Default z-score threshold (3.0) is too tight for heavy-tailed distributions | Raise the z-threshold or switch to IQR mode with a higher multiplier; re-run data_quality_validator.py validate --detect-anomalies |
| Cost estimate differs significantly from actual cloud bill | The tool uses heuristic estimates without live warehouse metadata | Treat etl_performance_optimizer.py estimate-cost output as a directional guide; cross-reference with the warehouse query history view |
Success Criteria
- Pipeline SLA above 99.5% -- fewer than 2 unplanned failures per month across all production DAGs.
- Data freshness under 15 minutes for streaming pipelines and under 2 hours for batch pipelines measured at the mart layer.
- Data quality score >= 95% on completeness, uniqueness, validity, consistency, and accuracy (as reported by
data_quality_validator.py). - Zero duplicate records in all fact and dimension tables enforced by primary key merge/upsert logic and dbt uniqueness tests.
- Query optimization savings >= 30% in compute cost or execution time after applying recommendations from
etl_performance_optimizer.py analyze-sql. - Schema drift detected within one pipeline run -- all contract violations surfaced automatically before data reaches the mart layer.
- Incident MTTR under 30 minutes for P1 pipeline failures with documented runbooks referencing the troubleshooting table above.
Scope & Limitations
This skill covers:
- End-to-end batch and streaming pipeline design (Airflow, Prefect, Dagster, Kafka, Spark)
- Data quality validation, profiling, anomaly detection, and Great Expectations suite generation
- SQL and Spark performance analysis with actionable optimization recommendations
- Data modeling patterns (star schema, snowflake, data vault, SCD types)
This skill does NOT cover:
- Machine learning model training and serving -- see
senior-ml-engineer - Statistical experiment design and hypothesis testing -- see
senior-data-scientist - Cloud infrastructure provisioning (Terraform, CloudFormation) -- see
senior-devopsoraws-solution-architect - Application-level security hardening and vulnerability scanning -- see
senior-secopsorsenior-security
Integration Points
| Skill | Integration | Data Flow |
|---|---|---|
senior-data-scientist |
Feature engineering pipelines consume cleaned data from the mart layer | Data engineer publishes curated datasets; data scientist runs experiments and feeds feature definitions back for productionization |
senior-ml-engineer |
ML training pipelines depend on feature store tables built by data engineering | Data engineer maintains feature store refresh; ML engineer deploys model artifacts and monitoring |
senior-devops |
CI/CD for dbt projects, Airflow deployment, and container orchestration | Data engineer defines pipeline code; DevOps manages infrastructure, Docker images, and deployment workflows |
senior-architect |
Architecture reviews for data platform decisions (lakehouse vs warehouse, Lambda vs Kappa) | Data engineer proposes designs; architect validates against enterprise standards and non-functional requirements |
senior-backend |
API-sourced ingestion and event-driven pipelines consume backend service events | Backend publishes events to Kafka/queues; data engineer builds consumers and transformation layers |
code-reviewer |
Pipeline code reviews for Airflow DAGs, dbt models, and Spark jobs | Data engineer submits PRs; code reviewer validates SQL patterns, idempotency, and error handling |
Tool Reference
pipeline_orchestrator.py
Purpose: Generate pipeline configurations for Airflow, Prefect, and Dagster. Supports ETL pattern generation, dependency management, scheduling, and DAG validation.
Subcommands:
generate -- Generate pipeline code
python scripts/pipeline_orchestrator.py generate \
--type airflow \
--source postgres \
--destination snowflake \
--tables orders,customers \
--schedule "0 5 * * *" \
--mode incremental \
--output dags/my_pipeline.py
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
--type |
-t |
Yes | -- | Pipeline framework: airflow, prefect, dagster |
--source |
-s |
No | postgres |
Source system type |
--destination |
-d |
No | snowflake |
Destination system type |
--tables |
-- | No | -- | Comma-separated list of tables to extract |
--config |
-c |
No | -- | Configuration YAML file (overrides other source/dest flags) |
--output |
-o |
No | stdout | Output file path for generated code |
--name |
-n |
No | auto-generated | Pipeline name |
--schedule |
-- | No | 0 5 * * * |
Cron schedule expression |
--mode |
-- | No | incremental |
Load mode: incremental or full |
Output formats: Generated Python code written to file or printed to stdout.
validate -- Validate existing pipeline code
python scripts/pipeline_orchestrator.py validate \
--dag dags/my_dag.py \
--type airflow
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
--dag |
-- | Yes | -- | Path to the DAG/flow file to validate |
--type |
-t |
Yes | -- | Framework type: airflow, prefect, dagster |
Output formats: JSON validation result with valid (boolean), issues (list), and warnings (list). Exits with code 0 on success, 1 on failure.
template -- Generate from ETL pattern template
python scripts/pipeline_orchestrator.py template \
--pattern extract-load \
--type airflow \
--source postgres \
--destination snowflake \
--tables orders,customers \
--output dags/el_pipeline.py
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
--pattern |
-p |
Yes | -- | ETL pattern: extract-load, transform, cdc |
--type |
-t |
Yes | -- | Framework type: airflow, prefect, dagster |
--source |
-s |
Yes | -- | Source system type |
--destination |
-d |
Yes | -- | Destination system type |
--tables |
-- | Yes | -- | Comma-separated list of tables |
--output |
-o |
No | stdout | Output file path |
data_quality_validator.py
Purpose: Comprehensive data quality validation including schema checking, data profiling, anomaly detection, Great Expectations suite generation, and data contract enforcement. Supports CSV, JSON, and JSONL inputs.
Global flags:
| Flag | Short | Description |
|---|---|---|
--verbose |
-v |
Enable verbose logging output |
Subcommands:
validate -- Validate data against a schema
python scripts/data_quality_validator.py validate data.csv \
--schema schema.json \
--detect-anomalies \
--output report.json \
--json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | Input data file (CSV, JSON, JSONL) |
--schema |
-s |
No | -- | Schema file (JSON) to validate against |
--output |
-o |
No | stdout | Output report file path |
--json |
-- | No | false |
Output as JSON instead of human-readable text |
--detect-anomalies |
-- | No | false |
Also run statistical anomaly detection (z-score and IQR) |
Output formats: Human-readable validation report (default) or JSON with per-check results, severity, failed rows, and overall quality score.
profile -- Generate a statistical data profile
python scripts/data_quality_validator.py profile data.csv \
--output profile.json \
--json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | Input data file |
--output |
-o |
No | stdout | Output profile file path |
--json |
-- | No | false |
Output as JSON |
Output formats: Per-column statistics including null counts, unique counts, min/max/mean/median/std_dev for numerics, length stats for strings, top values, and detected patterns.
generate-suite -- Generate a Great Expectations suite
python scripts/data_quality_validator.py generate-suite data.csv \
--output expectations.json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | Input data file to base expectations on |
--output |
-o |
No | stdout | Output expectations JSON file |
Output formats: JSON expectation suite compatible with Great Expectations, derived from the data profile.
contract -- Validate against a data contract
python scripts/data_quality_validator.py contract data.csv \
--contract contract.yaml \
--output report.json \
--json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | Input data file |
--contract |
-c |
Yes | -- | Data contract file (YAML or JSON) |
--output |
-o |
No | stdout | Output report file path |
--json |
-- | No | false |
Output as JSON |
Output formats: Contract validation report showing SLA compliance (freshness, completeness, accuracy) and per-field results.
schema -- Infer and generate a schema from data
python scripts/data_quality_validator.py schema data.csv \
--output schema.json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | Input data file |
--output |
-o |
No | stdout | Output schema JSON file |
Output formats: JSON schema with inferred column types, nullability, uniqueness, and detected patterns.
etl_performance_optimizer.py
Purpose: ETL/ELT performance analysis and optimization. Analyzes SQL queries, Spark job metrics, partition strategies, and estimates cloud warehouse costs. Provides actionable recommendations sorted by priority and severity.
Global flags:
| Flag | Short | Description |
|---|---|---|
--verbose |
-v |
Enable verbose logging output |
Subcommands:
analyze-sql -- Analyze a SQL query for optimization
python scripts/etl_performance_optimizer.py analyze-sql query.sql \
--warehouse snowflake \
--stats data_stats.json \
--output recommendations.json \
--json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | SQL file path or inline query string |
--warehouse |
-w |
No | -- | Target warehouse: bigquery, snowflake, redshift, databricks |
--stats |
-s |
No | -- | Data statistics JSON file for context-aware recommendations |
--output |
-o |
No | stdout | Output file path |
--json |
-- | No | false |
Output as JSON |
Output formats: Prioritized list of recommendations with category, severity, title, description, current issue, recommendation, expected improvement, and implementation steps.
analyze-spark -- Analyze Spark job metrics
python scripts/etl_performance_optimizer.py analyze-spark spark_metrics.json \
--output report.json \
--json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | Spark metrics JSON file (from Spark History Server or custom export) |
--output |
-o |
No | stdout | Output file path |
--json |
-- | No | false |
Output as JSON |
Output formats: Analysis of shuffle, memory, GC pressure, skew ratio, and task failure rates with targeted recommendations.
optimize-partition -- Recommend partition strategies
python scripts/etl_performance_optimizer.py optimize-partition data_stats.json \
--output partitions.json \
--json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | Data statistics JSON file with column cardinality and distribution info |
--output |
-o |
No | stdout | Output file path |
--json |
-- | No | false |
Output as JSON |
Output formats: Partition strategy per column including type (range, hash, list), recommended partition count, target partition size in MB, reasoning, and implementation SQL.
estimate-cost -- Estimate query execution cost
python scripts/etl_performance_optimizer.py estimate-cost query.sql \
--warehouse snowflake \
--stats data_stats.json \
--output cost.json \
--json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
input (positional) |
-- | Yes | -- | SQL file path or inline query string |
--warehouse |
-w |
Yes | -- | Target warehouse: bigquery, snowflake, redshift, databricks |
--stats |
-s |
No | -- | Data statistics JSON file for more accurate estimates |
--output |
-o |
No | stdout | Output file path |
--json |
-- | No | false |
Output as JSON |
Output formats: Cost breakdown with compute, storage, and data transfer costs in USD plus underlying assumptions.
template -- Generate template files for input
python scripts/etl_performance_optimizer.py template data_stats --output stats_template.json
python scripts/etl_performance_optimizer.py template spark_metrics --output metrics_template.json
| Flag | Short | Required | Default | Description |
|---|---|---|---|---|
template (positional) |
-- | Yes | -- | Template type: data_stats or spark_metrics |
--output |
-o |
No | stdout | Output file path |
Output formats: JSON template with placeholder values showing the expected structure for --stats input files or Spark metrics files.
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
churn-prevention
SaaS churn reduction covering cancel flow design, dynamic save offers, exit survey architecture, dunning sequences, payment recovery, win-back campaigns, and churn impact modeling.
popup-cro
Popup and modal optimization for conversion. Covers exit-intent, slide-ins, banners, timing optimization, frequency capping, audience targeting, compliance, and A/B testing frameworks for lead capture, promotions, and announcements.
competitor-alternatives
Competitor comparison and alternative page creation for SEO and sales enablement. Covers 4 page formats (singular alternative, plural alternatives, vs pages, competitor vs competitor), content architecture, research methodology, and centralized competitor data management.
contract-and-proposal-writer
Generate production-ready business documents including freelance contracts, project proposals, SOWs, NDAs, and MSAs with jurisdiction-aware clauses. Covers US (Delaware), EU (GDPR), UK, and DACH (German law) legal frameworks. Includes contract templates, clause libraries, and DOCX conversion. Use when starting client engagements, writing proposals, drafting partnership agreements, or needing GDPR-compliant data processing addenda.
pricing-strategy
SaaS pricing design and optimization covering value metric selection, tier architecture, price point research, pricing page design, price increase execution, and competitive pricing analysis.
referral-program
Referral and affiliate program design covering referral loop architecture, incentive design, trigger moment optimization, viral coefficient modeling, affiliate program structure, and optimization playbook.
Didn't find tool you were looking for?