.claude/skills/ts-airflow/SKILL.md
Apache Airflow is a platform for programmatically authoring, scheduling, and monitoring workflows. Learn to write DAGs, use operators, set up connections, configure scheduling, and deploy with Docker Compose.
npx skillsauth add eliferjunior/Claude airflowInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Apache Airflow lets you define workflows as Directed Acyclic Graphs (DAGs) in Python. Each DAG consists of tasks connected by dependencies, scheduled and monitored via a web UI.
# docker-compose.yml: Airflow with LocalExecutor (simplified)
services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-data:/var/lib/postgresql/data
airflow-webserver:
image: apache/airflow:2.9.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__WEBSERVER__SECRET_KEY: changeme
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: bash -c "airflow db migrate && airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email [email protected] && airflow webserver"
airflow-scheduler:
image: apache/airflow:2.9.0
depends_on: [postgres]
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
volumes:
- ./dags:/opt/airflow/dags
command: airflow scheduler
volumes:
postgres-data:
# Start Airflow
docker compose up -d
# UI at http://localhost:8080 (admin/admin)
# dags/hello_world.py: Simple DAG with PythonOperator
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='hello_world',
default_args=default_args,
description='A simple hello world DAG',
schedule='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['example'],
) as dag:
def extract(**kwargs):
import requests
data = requests.get('https://api.example.com/data').json()
kwargs['ti'].xcom_push(key='raw_data', value=data)
def transform(**kwargs):
data = kwargs['ti'].xcom_pull(key='raw_data', task_ids='extract')
transformed = [{'id': d['id'], 'value': d['amount'] * 100} for d in data]
kwargs['ti'].xcom_push(key='transformed', value=transformed)
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = BashOperator(task_id='load', bash_command='echo "Loading data..."')
extract_task >> transform_task >> load_task
# dags/taskflow_etl.py: Modern TaskFlow API with decorators
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule='@daily',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['etl'],
)
def taskflow_etl():
@task()
def extract():
return {'users': 100, 'revenue': 50000}
@task()
def transform(data: dict):
return {
'users': data['users'],
'avg_revenue': data['revenue'] / data['users'],
}
@task()
def load(summary: dict):
print(f"Users: {summary['users']}, Avg Revenue: {summary['avg_revenue']}")
raw = extract()
transformed = transform(raw)
load(transformed)
taskflow_etl()
# dags/operators_demo.py: Various operator examples
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.sensors.filesystem import FileSensor
# SQL execution
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='my_postgres',
sql="""
CREATE TABLE IF NOT EXISTS daily_stats (
date DATE PRIMARY KEY,
total_users INT,
revenue NUMERIC
);
""",
)
# HTTP request
fetch_api = SimpleHttpOperator(
task_id='fetch_api',
http_conn_id='my_api',
endpoint='/api/stats',
method='GET',
response_filter=lambda r: r.json(),
)
# Wait for file
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/data/incoming/report.csv',
poke_interval=60,
timeout=3600,
)
# connections.sh: Set up connections via CLI
airflow connections add 'my_postgres' \
--conn-type 'postgres' \
--conn-host 'localhost' \
--conn-schema 'mydb' \
--conn-login 'user' \
--conn-password 'pass' \
--conn-port 5432
# Set variables
airflow variables set 'api_key' 'abc123'
airflow variables set 'config' '{"batch_size": 1000}' --serialize-json
# Trigger a DAG
airflow dags trigger hello_world --conf '{"date": "2026-02-19"}'
development
Expert guidance for Fireworks AI, the platform for running open-source LLMs (Llama, Mixtral, Qwen, etc.) with enterprise-grade speed and reliability. Helps developers integrate Fireworks' inference API, fine-tune models, and deploy custom model endpoints with function calling and structured output support.
development
Convert any website into clean, structured data with Firecrawl — API-first web scraping service. Use when someone asks to "turn a website into markdown", "scrape website for LLM", "Firecrawl", "extract website content as clean text", "crawl and convert to structured data", or "scrape website for RAG". Covers single-page scraping, full-site crawling, structured extraction, and LLM-ready output.
tools
Expert guidance for Firebase, Google's platform for building and scaling web and mobile applications. Helps developers set up authentication, Firestore/Realtime Database, Cloud Functions, hosting, storage, and analytics using Firebase's SDK and CLI.
development
When the user needs to build file upload functionality for a web application. Use when the user mentions "file upload," "image upload," "upload endpoint," "multipart upload," "presigned URL," "S3 upload," "file validation," "upload to cloud storage," or "accept user files." Handles upload endpoints, file validation (type, size, magic bytes), cloud storage integration, and upload status tracking. For image/video processing after upload, see media-transcoder.