skills/data-engineering/airflow/SKILL.md
Open-source platform for authoring, scheduling, and monitoring data pipelines programmatically.
npx skillsauth add alphaonedev/openclaw-graph airflowInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Airflow is an open-source workflow orchestration tool for defining, scheduling, and monitoring data pipelines as code. It uses Python to create Directed Acyclic Graphs (DAGs) that represent task dependencies and execution flows.
Use Airflow for scenarios involving recurring data tasks, such as ETL processes, batch jobs, or complex workflows with dependencies. It's ideal when you need dynamic scheduling, retries, and monitoring in data engineering pipelines, especially for production-scale operations with tools like Spark or databases.
/admin/.BashOperator for shell commands or PythonOperator for custom functions.PostgresHook for database connections.airflow.cfg file, e.g., set [core] executor = LocalExecutor for local testing.To use Airflow, install it via pip install apache-airflow, then initialize the database with airflow db init. Define DAGs in the dags folder of your Airflow home directory. Always use a virtual environment to avoid conflicts. For authentication, set environment variables like $AIRFLOW_UID for user isolation.
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG('daily_backup', schedule_interval='@daily')
task = BashOperator(task_id='backup', bash_command='mysqldump db > backup.sql', dag=dag)
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
task = SparkSubmitOperator(task_id='spark_job', application='/path/to/script.py', dag=dag)
Run Airflow from the command line after setting up your environment. Use $AIRFLOW__CORE__FERNET_KEY for encrypted connections if needed.
CLI Commands:
airflow db init --with-db-initairflow webserver --port 8080airflow scheduler --dag-id my_dagairflow dags trigger my_dag --conf '{"key":"value"}'airflow dags listAPI Endpoints (via REST API, enabled in airflow.cfg with [api] auth_backend = airflow.api.auth.backend.default):
/api/v1/dags to list all DAGs, requires authentication via API token set as $AIRFLOW_API_TOKEN./api/v1/dags/{dag_id}/dagRuns to trigger a DAG run, e.g., with JSON payload {"conf": {"param": "value"}}.import requests
response = requests.get('http://localhost:8080/api/v1/dags', headers={'Authorization': f'Bearer {os.environ["AIRFLOW_API_TOKEN"]}'})
print(response.json())
Integrate Airflow with other tools via hooks and operators. For secrets, use Airflow's Variables or Connections, stored in the metadata database. Set environment variables like $AIRFLOW_CONN_POSTGRES_DEFAULT for database connections (e.g., postgresql://user:pass@localhost/db).
SparkSubmitOperator and set executor configs in the operator, e.g., conf={"spark.executor.memory": "4g"}.S3Hook for file operations; set $AWS_ACCESS_KEY_ID and $AWS_SECRET_ACCESS_KEY as env vars.[kubernetes] namespace = default in airflow.cfg and use KubernetesPodOperator.Handle errors by configuring retries in task definitions, e.g., retries=3, retry_delay=timedelta(minutes=5). Check logs via the Web UI or airflow tasks logs <dag_id> <task_id>. Use on_failure_callback in DAGs to trigger alerts.
email_on_failure=True and set [smtp] smtp_host = your.smtp.server in airflow.cfg.from airflow.utils.email import send_email
task = PythonOperator(task_id='failing_task', python_callable=my_function, on_failure_callback=lambda context: send_email('[email protected]', 'Task Failed', 'Error details'))
tools
Root web development: project structure, tooling selection, deployment decisions
development
WebAssembly: Rust/Go/C to WASM, wasm-bindgen, Emscripten, WASM Component Model
development
Vue 3: Composition API script setup, Pinia, Vue Router 4, SFCs, Vite, Nuxt 3
tools
Tailwind CSS 4: utility classes, config, JIT, arbitrary values, darkMode, plugins, shadcn/ui