skills/airflow-hitl/SKILL.md
Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
npx skillsauth add rory-data/copilot airflow-hitlInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Pause a DAG until a human responds via the Airflow UI or REST API. HITL operators are deferrable — they release their worker slot while waiting.
Requires Airflow 3.1+ (
uvx --from astro-airflow-mcp af config version).UI location: Browse → Required Actions. Respond from the task instance page's Required Actions tab.
Cross-references:
airflow-aifor AI/LLM task decorators;airflowfor registry and API discovery commands used below.
| Capability | Class (verify in Step 2) |
|---|---|
| Approve or reject; downstream skips on reject | ApprovalOperator |
| Present N options and return which were chosen | HITLOperator |
| Branch to one or more downstream tasks based on a choice | HITLBranchOperator |
| Collect a form (no approve/select step) | HITLEntryOperator |
| Use the HITL trigger directly (advanced / custom operators) | HITLTrigger |
This is the only place class names are hardcoded. The provider adds, renames, and removes params across releases — do not copy parameter lists from memory. Fetch the current signature before writing code.
Before writing HITL code, run these to see the live roster and constructor params (see the airflow skill for the full af registry reference):
# Every HITL-related module in the standard provider
uvx --from astro-airflow-mcp af registry modules standard \
| jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'
# Constructor signatures: name, type, default, required, description
uvx --from astro-airflow-mcp af registry parameters standard \
| jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'
# Pin to the exact installed provider version
uvx --from astro-airflow-mcp af config providers \
| jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# then: af registry parameters standard --version <VERSION>
If the registry shows a param that this skill does not mention, prefer the registry. If the registry shows a class that is not in Step 1, treat it as additive — the decision table above may be stale.
Starting point for any HITL task. Adapt by swapping the class name and params per Step 2.
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Auto-selected on timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
For the other classes in Step 1, the shape is the same (task_id, subject, plus class-specific params). Verify each constructor through Step 2 — for example, HITLBranchOperator requires every option either to match a downstream task id directly or to be resolved via a mapping param surfaced in the registry.
defaults set: task succeeds on timeout, default option(s) selected.defaults: task fails on timeout.bodybody supports Markdown and is Jinja-templatable. Render XCom context directly:
body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""
All HITL operators accept the standard Airflow callback kwargs (on_success_callback, on_failure_callback, etc.).
HITL operators accept a notifiers list. Inside a notifier's notify(context) method, build a link to the pending task with HITLOperator.generate_link_to_ui_from_context(context, base_url=...).
The parameter name and accepted identifier format depend on the active auth manager. Do not hardcode — check which one is active and which kwarg the current provider exposes:
uvx --from astro-airflow-mcp af config show | jq '.auth_manager // .core.auth_manager'
Then look up the current kwarg in Step 2 (at the time of writing it is assigned_users, accepting identifiers in whatever format the active auth manager uses — Astro uses the Astro user ID, FabAuthManager uses email, SimpleAuthManager uses username).
For Slack bots, custom apps, or scripts. Discover the live endpoint rather than hardcoding a path:
uvx --from astro-airflow-mcp af api ls --filter hitl # live endpoint list
uvx --from astro-airflow-mcp af api spec \
| jq '.paths | to_entries[] | select(.key | test("hitl"))' # request/response schemas
The PATCH-to-respond pattern is stable; the exact path is discovered. Typical shape:
import os, requests
HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
# List pending — use the path from `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})
# Respond — same discovered path family, PATCH
requests.patch(
f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
headers=HEADERS,
json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)
af config version).respondents-vs-assigned_users style drift.defaults is also in options.execution_timeout set; defaults configured if timeout should succeed rather than fail.The upstream docs URL is surfaced per-module by the registry — do not hardcode:
uvx --from astro-airflow-mcp af registry modules standard \
| jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'
af registry, af api, af config command reference.tools
Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.
tools
Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.
development
Detects and fixes common code smells during review or refactoring. Invoke whenever reviewing code for quality issues, before merging a PR, when refactoring legacy code, or when the user asks about code quality, anti-patterns, or technical debt. Detects: over-abstraction, complex inheritance, large functions, tight coupling, hidden dependencies, magic numbers, boolean traps, swallowed exceptions, global state, and duplicate code. Provides specific fixes with before/after examples. Also invoke when someone says "review this code", "is this clean?", "can I improve this?", "this feels messy", or "find problems in my code".
testing
Create new skills, modify and improve existing skills, and measure skill performance. Use when users want to create a skill from scratch, edit, or optimize an existing skill, run evals to test a skill, benchmark skill performance with variance analysis, or optimize a skill's description for better triggering accuracy.