Agent skill

airflow-dag-analyzer

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability, performance, and best practices adherence.

Stars 514
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/a5c-ai/babysitter/tree/main/library/specializations/data-engineering-analytics/skills/airflow-dag-analyzer

SKILL.md

Airflow DAG Analyzer

Analyzes, validates, and optimizes Apache Airflow DAGs for reliability and performance.

Overview

This skill examines Apache Airflow DAG definitions to identify performance bottlenecks, reliability issues, and best practice violations. It provides recommendations for task dependency optimization, parallelism configuration, error handling, and resource management.

Capabilities

  • DAG structure analysis and validation - Parse and validate DAG structure
  • Task dependency optimization - Identify bottlenecks and suggest parallel execution
  • Parallelism and concurrency recommendations - Optimize pool and slot allocation
  • SLA and timeout configuration - Recommend appropriate timeouts and SLAs
  • Retry and failure handling patterns - Validate retry logic and alerting
  • Sensor optimization - Smart sensors, deferrable operators, reschedule mode
  • Resource pool allocation - Optimize pool usage and worker distribution
  • DAG scheduling optimization - Catchup, backfill, and schedule interval tuning
  • Cross-DAG dependency detection - Identify external dependencies and triggers

Input Schema

json
{
  "dagCode": {
    "type": "string",
    "description": "The Python DAG definition code",
    "required": true
  },
  "dagId": {
    "type": "string",
    "description": "The DAG identifier"
  },
  "executionHistory": {
    "type": "object",
    "description": "Historical execution metrics",
    "properties": {
      "runs": {
        "type": "array",
        "items": {
          "dagRunId": "string",
          "executionDate": "string",
          "duration": "number",
          "state": "string",
          "taskDurations": "object"
        }
      }
    }
  },
  "clusterConfig": {
    "type": "object",
    "properties": {
      "workerCount": "number",
      "executorType": "string",
      "poolConfigs": "object",
      "airflowVersion": "string"
    }
  },
  "analysisScope": {
    "type": "array",
    "items": {
      "type": "string",
      "enum": ["structure", "performance", "reliability", "resources", "security"]
    },
    "default": ["structure", "performance", "reliability"]
  }
}

Output Schema

json
{
  "validationResults": {
    "errors": {
      "type": "array",
      "items": {
        "code": "string",
        "message": "string",
        "line": "number",
        "severity": "error"
      }
    },
    "warnings": {
      "type": "array",
      "items": {
        "code": "string",
        "message": "string",
        "line": "number",
        "severity": "warning"
      }
    }
  },
  "optimizations": {
    "type": "array",
    "items": {
      "category": "string",
      "current": "string",
      "recommended": "string",
      "impact": "high|medium|low",
      "effort": "string",
      "codeChange": "string"
    }
  },
  "recommendedConfig": {
    "type": "object",
    "properties": {
      "poolSize": "number",
      "maxActiveRuns": "number",
      "concurrency": "number",
      "defaultRetries": "number",
      "executionTimeout": "string"
    }
  },
  "dependencyGraph": {
    "type": "object",
    "properties": {
      "nodes": "array",
      "edges": "array",
      "criticalPath": "array",
      "parallelGroups": "array"
    }
  },
  "metrics": {
    "taskCount": "number",
    "maxDepth": "number",
    "parallelizationRatio": "number",
    "estimatedDuration": "string"
  },
  "securityFindings": {
    "type": "array",
    "items": {
      "severity": "high|medium|low",
      "finding": "string",
      "recommendation": "string"
    }
  }
}

Usage Examples

Basic DAG Analysis

json
{
  "dagCode": "from airflow import DAG\nfrom airflow.operators.python import PythonOperator\n...",
  "dagId": "daily_etl_pipeline"
}

With Execution History

json
{
  "dagCode": "...",
  "dagId": "daily_etl_pipeline",
  "executionHistory": {
    "runs": [
      {
        "dagRunId": "manual__2024-01-15",
        "duration": 3600,
        "state": "success",
        "taskDurations": {
          "extract": 600,
          "transform": 1800,
          "load": 1200
        }
      }
    ]
  }
}

Full Analysis with Cluster Config

json
{
  "dagCode": "...",
  "dagId": "complex_ml_pipeline",
  "clusterConfig": {
    "workerCount": 8,
    "executorType": "KubernetesExecutor",
    "poolConfigs": {
      "default_pool": {"slots": 128},
      "ml_pool": {"slots": 32}
    },
    "airflowVersion": "2.8.0"
  },
  "analysisScope": ["structure", "performance", "reliability", "resources", "security"]
}

Validation Rules

DAG Definition Rules

Rule Severity Description
DAG-001 Error Missing DAG default_args
DAG-002 Error Invalid schedule_interval
DAG-003 Warning Catchup enabled for long-running DAG
DAG-004 Warning No email on failure configured
DAG-005 Info Consider using @dag decorator

Task Definition Rules

Rule Severity Description
TSK-001 Error Task has no upstream or downstream
TSK-002 Warning Task missing retries configuration
TSK-003 Warning Execution timeout not set
TSK-004 Warning PythonOperator with no pool
TSK-005 Info Consider TaskGroup for related tasks

Sensor Rules

Rule Severity Description
SEN-001 Warning Sensor in poke mode (use reschedule)
SEN-002 Warning Sensor missing timeout
SEN-003 Info Consider deferrable operator
SEN-004 Warning External sensor without soft_fail

Security Rules

Rule Severity Description
SEC-001 Error Hardcoded credentials
SEC-002 Warning Using Variable.get without default
SEC-003 Warning Connection ID not parameterized
SEC-004 Info Consider Secrets Backend

Optimization Patterns

Parallelization

python
# Before: Sequential execution
task1 >> task2 >> task3 >> task4

# After: Parallel execution where possible
task1 >> [task2, task3] >> task4

Sensor Optimization

python
# Before: Poke mode (blocks worker)
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    mode='poke'  # Bad
)

# After: Reschedule mode (releases worker)
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    mode='reschedule',  # Good
    poke_interval=300
)

# Best: Deferrable (Airflow 2.2+)
from airflow.sensors.filesystem import FileSensor
FileSensor(
    task_id='wait_for_file',
    filepath='/data/input.csv',
    deferrable=True
)

TaskGroups

python
# Before: Flat task structure
extract_orders >> transform_orders >> load_orders
extract_products >> transform_products >> load_products

# After: TaskGroups for organization
with TaskGroup('orders') as orders_group:
    extract >> transform >> load

with TaskGroup('products') as products_group:
    extract >> transform >> load

Dynamic Task Mapping (Airflow 2.3+)

python
# Before: Static task generation
for i in range(10):
    PythonOperator(task_id=f'process_{i}', ...)

# After: Dynamic task mapping
@task
def process_item(item):
    return item * 2

process_item.expand(item=[1, 2, 3, 4, 5])

Configuration Recommendations

Default Args Template

python
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
    'execution_timeout': timedelta(hours=2),
    'sla': timedelta(hours=1),
}

Pool Configuration

Workload Type Recommended Pool Size
Heavy compute 2-4 per worker
I/O bound 8-16 per worker
API calls Rate limit based
Sensors Separate pool, high slots

Integration Points

MCP Server Integration

  • yangkyeongmo/mcp-server-apache-airflow - Airflow REST API integration
  • Dagster MCP - Alternative orchestration patterns
  • Prefect MCP - Modern orchestration comparison

Related Skills

  • dbt Project Analyzer (SK-DEA-003) - dbt operator optimization
  • Data Lineage Mapper (SK-DEA-010) - Task lineage extraction

Applicable Processes

  • ETL/ELT Pipeline (etl-elt-pipeline.js)
  • A/B Testing Pipeline (ab-testing-pipeline.js)
  • Pipeline Migration (pipeline-migration.js)
  • Data Quality Framework (data-quality-framework.js)

References

Version History

  • 1.0.0 - Initial release with Airflow 2.x support

Expand your agent's capabilities with these related and highly-rated skills.

a5c-ai/babysitter

gsd-tools

Central utility skill for GSD operations. Provides config parsing, slug generation, timestamps, path operations, and orchestrates calls to other specialized skills. Acts as the unified entry point that the original gsd-tools.cjs provided via its lib/ modules (commands, config, core, init).

514 31
Explore
a5c-ai/babysitter

model-profile-resolution

Resolve model profile (quality/balanced/budget) at orchestration start and map agents to specific models. Enables cost/quality tradeoffs by selecting appropriate AI models for each agent role.

514 31
Explore
a5c-ai/babysitter

verification-suite

Plan structure validation, phase completeness checks, reference integrity verification, and artifact existence confirmation. Provides the structured verification layer ensuring GSD artifacts are well-formed and complete.

514 31
Explore
a5c-ai/babysitter

state-management

STATE.md reading, writing, and field-level updates. Provides cross-session state persistence via .planning/STATE.md with structured fields for current task, completed phases, blockers, decisions, and quick tasks.

514 31
Explore
a5c-ai/babysitter

git-integration

Git commit patterns, formats, and conventions for GSD methodology. Provides atomic commits per task, structured commit messages, planning file commits, branch management, and milestone tag operations.

514 31
Explore
a5c-ai/babysitter

frontmatter-parsing

YAML frontmatter parsing and manipulation for .planning/ documents. Provides read, write, update, query, and validation operations on frontmatter blocks in GSD markdown artifacts.

514 31
Explore

Didn't find tool you were looking for?

Be as detailed as possible for better results