Agent skill
airflow-hitl
Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
Install this agent skill to your Project
npx add-skill https://github.com/astronomer/agents/tree/main/skills/airflow-hitl
SKILL.md
Airflow Human-in-the-Loop Operators
Implement human approval gates, form inputs, and human-driven branching in Airflow DAGs using the HITL operators. These deferrable operators pause workflow execution until a human responds via the Airflow UI or REST API.
Implementation Checklist
Execute steps in order. Prefer deferrable HITL operators over custom sensors/polling loops.
CRITICAL: Requires Airflow 3.1+. NOT available in Airflow 2.x.
Deferrable: All HITL operators are deferrable—they release their worker slot while waiting for human input.
UI Location: View pending actions at Browse → Required Actions in Airflow UI. Respond via the task instance page's Required Actions tab or the REST API.
Cross-reference: For AI/LLM calls, see the airflow-ai skill.
Step 1: Choose operator
| Operator | Human action | Outcome |
|---|---|---|
ApprovalOperator |
Approve or Reject | Reject causes downstream tasks to be skipped (approval task itself succeeds) |
HITLOperator |
Select option(s) + form | Returns selections |
HITLBranchOperator |
Select downstream task(s) | Runs selected, skips others |
HITLEntryOperator |
Submit form | Returns form data |
Step 2: Implement operator
ApprovalOperator
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Optional: auto on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
HITLOperator
Required parameters:
subjectandoptions.
from airflow.providers.standard.operators.hitl import HITLOperator
from airflow.sdk import dag, task, chain, Param
from datetime import timedelta
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def hitl_example():
hitl = HITLOperator(
task_id="select_option",
subject="Select Payment Method",
body="Choose how to process payment",
options=["ACH", "Wire", "Check"], # REQUIRED
defaults=["ACH"],
multiple=False,
execution_timeout=timedelta(hours=4),
params={"amount": Param(1000, type="number")},
)
@task
def process(result):
print(f"Selected: {result['chosen_options']}")
print(f"Amount: {result['params_input']['amount']}")
process(hitl.output)
hitl_example()
HITLBranchOperator
IMPORTANT: Options can either:
- Directly match downstream task IDs - simpler approach
- Use
options_mapping- for human-friendly labels that map to task IDs
from airflow.providers.standard.operators.hitl import HITLBranchOperator
from airflow.sdk import dag, task, chain
from pendulum import datetime
DEPTS = ["marketing", "engineering", "sales"]
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def branch_example():
branch = HITLBranchOperator(
task_id="select_dept",
subject="Select Departments",
options=[f"Fund {d}" for d in DEPTS],
options_mapping={f"Fund {d}": d for d in DEPTS},
multiple=True,
)
for dept in DEPTS:
@task(task_id=dept)
def handle(dept_name: str = dept):
# Bind the loop variable at definition time to avoid late-binding bugs
print(f"Processing {dept_name}")
chain(branch, handle())
branch_example()
HITLEntryOperator
from airflow.providers.standard.operators.hitl import HITLEntryOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def entry_example():
entry = HITLEntryOperator(
task_id="get_input",
subject="Enter Details",
body="Provide response",
params={
"response": Param("", type="string"),
"priority": Param("p3", type="string"),
},
)
@task
def process(result):
print(f"Response: {result['params_input']['response']}")
process(entry.output)
entry_example()
Step 3: Optional features
Notifiers
from airflow.sdk import BaseNotifier, Context
from airflow.providers.standard.operators.hitl import HITLOperator
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message=""): self.message = message
def notify(self, context: Context):
if context["ti"].state == "running":
url = HITLOperator.generate_link_to_ui_from_context(context, base_url="https://airflow.example.com")
self.log.info(f"Action needed: {url}")
hitl = HITLOperator(..., notifiers=[MyNotifier("{{ task.subject }}")])
Restrict respondents
Format depends on your auth manager:
| Auth Manager | Format | Example |
|---|---|---|
| SimpleAuthManager | Username | ["admin", "manager"] |
| FabAuthManager | ["manager@example.com"] |
|
| Astro | Astro ID | ["cl1a2b3cd456789ef1gh2ijkl3"] |
Astro Users: Find Astro ID at Organization → Access Management.
hitl = HITLOperator(..., respondents=["manager@example.com"]) # FabAuthManager
Timeout behavior
- With
defaults: Task succeeds, default option(s) selected - Without
defaults: Task fails on timeout
hitl = HITLOperator(
...,
options=["Option A", "Option B"],
defaults=["Option A"], # Auto-selected on timeout
execution_timeout=timedelta(hours=4),
)
Markdown in body
The body parameter supports markdown formatting and is Jinja templatable:
hitl = HITLOperator(
...,
body="""**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
""",
)
Callbacks
All HITL operators support standard Airflow callbacks:
def on_hitl_failure(context):
print(f"HITL task failed: {context['task_instance'].task_id}")
def on_hitl_success(context):
print(f"HITL task succeeded with: {context['task_instance'].xcom_pull()}")
hitl = HITLOperator(
task_id="approval_required",
subject="Review needed",
options=["Approve", "Reject"],
on_failure_callback=on_hitl_failure,
on_success_callback=on_hitl_success,
)
Step 4: API integration
For external responders (Slack, custom app):
import requests, os
HOST = os.getenv("AIRFLOW_HOST")
TOKEN = os.getenv("AIRFLOW_API_TOKEN")
# Get pending actions
r = requests.get(f"{HOST}/api/v2/hitlDetails/?state=pending",
headers={"Authorization": f"Bearer {TOKEN}"})
# Respond
requests.patch(
f"{HOST}/api/v2/hitlDetails/{dag_id}/{run_id}/{task_id}",
headers={"Authorization": f"Bearer {TOKEN}"},
json={"chosen_options": ["ACH"], "params_input": {"amount": 1500}}
)
Step 5: Safety checks
Before finalizing, verify:
- Airflow 3.1+ installed
- For
HITLBranchOperator: options map to downstream task IDs -
defaultsvalues are inoptionslist - API token configured if using external responders
Reference
- Airflow HITL Operators: https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/hitl.html
Related Skills
- airflow-ai: For AI/LLM task decorators and GenAI patterns
- authoring-dags: For general DAG writing best practices
- testing-dags: For testing DAGs with debugging cycles
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
testing-dags
Complex DAG testing workflows with debugging and fixing cycles. Use for multi-step testing requests like "test this dag and fix it if it fails", "test and debug", "run the pipeline and troubleshoot issues". For simple test requests ("test dag", "run dag"), the airflow entrypoint skill handles it directly. This skill is for iterative test-debug-fix cycles.
managing-astro-local-env
Manage local Airflow environment with Astro CLI. Use when the user wants to start, stop, or restart Airflow, view logs, troubleshoot containers, or fix environment issues. For project setup, see setting-up-astro-project.
analyzing-data
Queries data warehouse and answers business questions about data. Handles questions requiring database/warehouse queries including "who uses X", "how many Y", "show me Z", "find customers", "what is the count", data lookups, metrics, trends, or SQL analysis.
setting-up-astro-project
Initialize and configure Astro/Airflow projects. Use when the user wants to create a new project, set up dependencies, configure connections/variables, or understand project structure. For running the local environment, see managing-astro-local-env.
tracing-upstream-lineage
Trace upstream data lineage. Use when the user asks where data comes from, what feeds a table, upstream dependencies, data sources, or needs to understand data origins.
airflow-plugins
Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.
Didn't find tool you were looking for?