skills/airflow-dag-orchestrator/SKILL.md
Apache Airflow DAGs, operators, SLA monitoring, and workflow orchestration. Activate on: Airflow, DAG, operator, sensor, scheduler, task dependency, SLA, backfill, XCom. NOT for: dbt transformations (use dbt-analytics-engineer), streaming pipelines (use streaming-pipeline-architect).
npx skillsauth add curiositech/windags-skills airflow-dag-orchestratorInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Design and operate Apache Airflow DAGs for reliable data pipeline orchestration with proper dependency management, SLAs, and monitoring.
Activate on: "Airflow", "DAG", "operator", "sensor", "scheduler", "task dependency", "SLA", "backfill", "XCom", "TaskFlow API", "MWAA", "Cloud Composer"
NOT for: dbt model execution → dbt-analytics-engineer (though Airflow can trigger dbt) | Stream processing → streaming-pipeline-architect | Workflow engine (Temporal) → distributed-transaction-manager
@dag, @task decorators) for Python-native DAGscatchup=False unless backfill is intentionalretries=2, retry_delay=timedelta(minutes=5) on every tasksla=timedelta(hours=2) on critical path tasksairflow dags test my_dag 2026-01-01 before deploying| Domain | Technologies | |--------|-------------| | Airflow | Apache Airflow 2.10+, MWAA, Cloud Composer 3 | | Operators | BashOperator, PythonOperator, KubernetesPodOperator | | Providers | apache-airflow-providers-{snowflake, google, aws, dbt-cloud} | | Executors | CeleryExecutor, KubernetesExecutor, LocalExecutor | | Monitoring | SLA misses, task duration, Airflow metrics → Prometheus |
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
schedule="0 6 * * *", # daily at 6am UTC
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={
"retries": 2,
"retry_delay": timedelta(minutes=5),
"sla": timedelta(hours=2),
},
tags=["finance", "daily"],
)
def daily_revenue_pipeline():
@task()
def extract_payments() -> dict:
"""Extract from Stripe API"""
data = stripe_client.list_payments(date=today())
return {"count": len(data), "path": "s3://raw/payments/"}
@task()
def extract_orders() -> dict:
"""Extract from Shopify API"""
data = shopify_client.list_orders(date=today())
return {"count": len(data), "path": "s3://raw/orders/"}
@task()
def transform(payments: dict, orders: dict) -> str:
"""Join and transform in DuckDB"""
result_path = run_duckdb_transform(payments["path"], orders["path"])
return result_path
@task()
def load(path: str):
"""Load to Snowflake"""
snowflake_copy_into("fct_revenue", path)
# Define dependencies via function calls
payments = extract_payments()
orders = extract_orders()
transformed = transform(payments, orders)
load(transformed)
daily_revenue_pipeline()
@task()
def get_partitions() -> list[str]:
return ["2026-01-01", "2026-01-02", "2026-01-03"]
@task()
def process_partition(partition_date: str) -> dict:
"""Runs in parallel for each partition"""
return {"date": partition_date, "rows": process(partition_date)}
@task()
def aggregate(results: list[dict]):
"""Fan-in: receives all partition results"""
total = sum(r["rows"] for r in results)
log.info(f"Processed {total} total rows")
# Dynamically maps process_partition across all partitions
partitions = get_partitions()
results = process_partition.expand(partition_date=partitions)
aggregate(results)
from airflow.operators.bash import BashOperator
from cosmos import DbtDag, ProjectConfig, ProfileConfig
# Option 1: cosmos (recommended)
dbt_dag = DbtDag(
project_config=ProjectConfig("/opt/airflow/dbt/"),
profile_config=ProfileConfig(
profile_name="default",
target_name="prod",
),
schedule="@daily",
dag_id="dbt_daily",
)
# Option 2: BashOperator (simple)
dbt_run = BashOperator(
task_id="dbt_run",
bash_command="cd /opt/airflow/dbt && dbt build --select tag:daily",
)
catchup=False; otherwise Airflow runs every missed intervalretries >= 1 with a delaycatchup=False unless backfill is intentionalairflow dags test before deploymenttools
Building resilient distributed systems with circuit breakers, retries with full-jitter exponential backoff, retry budgets (per-request 3-attempt + per-client 10% ratio per Google SRE), deadline propagation, and the cascading-failure math (4 layers × 3 retries = 64x amplification). Grounded in Resilience4j, Microsoft Cloud Patterns, AWS Architecture Blog (Marc Brooker), and Google SRE Book.
testing
Designing HTTP cache headers that work correctly across browsers, CDNs, and shared proxies — `Cache-Control` directives per RFC 9111, `stale-while-revalidate` and `stale-if-error` per RFC 5861, the Vary header for varying responses, and surrogate keys for tag-based purging. Grounded in IETF RFCs and Cloudflare/Fastly docs.
development
Use when designing or fixing a Content Security Policy on a real site, choosing between nonce-based and hash-based CSP, adding strict-dynamic, debugging "Refused to execute inline script" errors, deploying CSP in report-only mode first, configuring report-to / report-uri, or auditing an existing policy for unsafe-inline / unsafe-eval / wildcards. Triggers: "CSP blocks legitimate inline script", strict-dynamic, nonce-{RANDOM}, sha256-{HASH}, object-src none, base-uri none, frame-ancestors, Trusted Types, X-Content-Security-Policy obsolete, report-only vs enforced. NOT for general HTTP security headers (HSTS, COOP/COEP), Trusted Types deep dive, CORS configuration, or building a WAF.
tools
Choosing and operating an HTTP API versioning strategy that doesn't break clients — Stripe's date-based pinned versions, the Deprecation/Sunset header pair (RFC 9745 + RFC 8594), URI vs header vs media-type approaches, and the version-transformer pattern. Grounded in Stripe's published architecture and IETF RFCs.