Agent skill

airflow-3x-migration

Comprehensive guide and patterns for migrating Apache Airflow 2.x workflows to Airflow 3.x, covering import changes, deprecated features, and new paradigms like Asset scheduling and TaskFlow API.

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/airflow-3x-migration

SKILL.md

Airflow 3.x Skills

Import Path Changes

Operators

python
# Airflow 2.x
from airflow.operators.python import PythonOperator

# Airflow 3.x
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator

Sensors

python
# Airflow 3.x
from airflow.providers.standard.sensors.filesystem import FileSensor
from airflow.providers.standard.sensors.time import TimeSensor

Removed Features

Removed Replacement
SubDagOperator TaskGroup
packaged_dag_processor Use standard DAG loading
airflow.contrib.* Provider packages
schedule_interval param schedule param

DAG Definition Changes

python
# Airflow 3.x preferred
from airflow import DAG
from datetime import datetime

with DAG(
    dag_id="my_dag",
    schedule="@daily",  # Not schedule_interval
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["betting", "sports"],
) as dag:
    ...

TaskFlow API (Preferred)

python
from airflow.decorators import dag, task

@dag(schedule="@daily", start_date=datetime(2024, 1, 1), catchup=False)
def betting_workflow():

    @task
    def download_games(sport: str) -> list:
        # Returns are automatically passed via XCom
        return fetch_games(sport)

    @task
    def update_elo(games: list) -> dict:
        return calculate_elo(games)

    # Chain tasks
    games = download_games("nba")
    ratings = update_elo(games)

betting_dag = betting_workflow()

Asset-Based Scheduling (Replaces Dataset)

python
from airflow.sdk import Asset

# Define assets
games_data = Asset("games_data")
elo_ratings = Asset("elo_ratings")

# Producer DAG
@dag(schedule="@daily")
def download_dag():
    @task(outlets=[games_data])
    def download():
        ...

# Consumer DAG - triggers when asset updates
@dag(schedule=[games_data])
def process_dag():
    @task
    def process():
        ...

Setup/Teardown Tasks

python
@task
def setup_db_connection():
    return create_connection()

@task
def cleanup_connection(conn):
    conn.close()

@task
def process_data(conn):
    ...

# Define setup/teardown relationship
with dag:
    conn = setup_db_connection()
    process_data(conn) >> cleanup_connection(conn)

    # Or use context manager style
    conn.as_setup() >> process_data(conn) >> conn.as_teardown()

DAG Versioning

python
from airflow import DAG

with DAG(
    dag_id="betting_workflow",
    version="2.0.0",  # New in 3.x
    schedule="@daily",
) as dag:
    ...

Backfill Changes

bash
# Airflow 3.x - use REST API
curl -X POST "http://localhost:8080/api/v1/dags/my_dag/dagRuns" \
  -H "Content-Type: application/json" \
  -d '{"logical_date": "2024-01-15T00:00:00Z"}'

# Or use new backfill command
airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-15

New REST API Endpoints

python
import requests

# Get DAG runs
response = requests.get(
    "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
    auth=("admin", "admin")
)

# Trigger DAG
response = requests.post(
    "http://localhost:8080/api/v1/dags/betting_workflow/dagRuns",
    json={"conf": {"sport": "nba"}},
    auth=("admin", "admin")
)

Edge Labels

python
from airflow.utils.edgemodifier import Label

download >> Label("success") >> process
download >> Label("failure") >> alert

Migration Checklist

  • Update all operator imports to provider packages
  • Replace schedule_interval with schedule
  • Convert SubDags to TaskGroups
  • Replace Dataset with Asset
  • Test DAG parsing with python dags/my_dag.py
  • Update docker-compose to Airflow 3.x image

Files to Reference

Airflow 3.x CLI

  • Has changed significantly since Airflow 2.
  • Please look at latest docs before running CLI commands

Didn't find tool you were looking for?

Be as detailed as possible for better results