skills/astronomer/blueprint/SKILL.md
Define reusable Airflow task group templates with Pydantic validation and compose DAGs from YAML. Use when creating blueprint templates, composing DAGs from YAML, validating configurations, or enabling no-code DAG authoring for non-engineers.
npx skillsauth add rory-data/copilot blueprintInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
You are helping a user work with Blueprint, a system for composing Airflow DAGs from YAML using reusable Python templates. Execute steps in order and prefer the simplest configuration that meets the user's needs.
Package:
airflow-blueprinton PyPI Repo: https://github.com/astronomer/blueprint Requires: Python 3.10+, Airflow 2.5+, Blueprint 0.2.0+
Confirm with the user:
| User Request | Action | |--------------|--------| | "Create a blueprint" / "Define a template" | Go to Creating Blueprints | | "Create a DAG from YAML" / "Compose steps" | Go to Composing DAGs in YAML | | "Customize DAG args" / "Add tags to DAG" | Go to Customizing DAG-Level Configuration | | "Override config at runtime" / "Trigger with params" | Go to Runtime Parameter Overrides | | "Post-process DAGs" / "Add callback" | Go to Post-Build Callbacks | | "Validate my YAML" / "Lint blueprint" | Go to Validation Commands | | "Set up blueprint in my project" | Go to Project Setup | | "Version my blueprint" | Go to Versioning | | "Generate schema" / "Astro IDE setup" | Go to Schema Generation | | Blueprint errors / troubleshooting | Go to Troubleshooting |
If the user is starting fresh, guide them through setup:
# Add to requirements.txt
airflow-blueprint>=0.2.0
# Or install directly
pip install airflow-blueprint
Create dags/loader.py:
from blueprint import build_all
build_all()
DAG-level configuration (schedule, description, tags, default_args, etc.) is handled via YAML fields and BlueprintDagArgs templates — see Customizing DAG-Level Configuration.
uvx --from airflow-blueprint blueprint list
If no blueprints found, user needs to create blueprint classes first.
When user wants to create a new blueprint template:
# dags/templates/my_blueprints.py
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from blueprint import Blueprint, BaseModel, Field
class MyConfig(BaseModel):
# Required field with description (used in CLI output and JSON schema)
source_table: str = Field(description="Source table name")
# Optional field with default and validation
batch_size: int = Field(default=1000, ge=1)
class MyBlueprint(Blueprint[MyConfig]):
"""Docstring becomes blueprint description."""
def render(self, config: MyConfig) -> TaskGroup:
with TaskGroup(group_id=self.step_id) as group:
BashOperator(
task_id="my_task",
bash_command=f"echo '{config.source_table}'"
)
return group
| Element | Requirement |
|---------|-------------|
| Config class | Must inherit from BaseModel |
| Blueprint class | Must inherit from Blueprint[ConfigClass] |
| render() method | Must return TaskGroup or BaseOperator |
| Task IDs | Use self.step_id for the group/task ID |
Suggest adding extra="forbid" to catch YAML typos:
from pydantic import ConfigDict
class MyConfig(BaseModel):
model_config = ConfigDict(extra="forbid")
# fields...
When user wants to create a DAG from blueprints:
# dags/my_pipeline.dag.yaml
dag_id: my_pipeline
schedule: "@daily"
description: "My data pipeline"
steps:
step_one:
blueprint: my_blueprint
source_table: raw.customers
batch_size: 500
step_two:
blueprint: another_blueprint
depends_on: [step_one]
target: analytics.output
By default, only schedule and description are supported as DAG-level fields (via the built-in DefaultDagArgs). For other fields like tags, default_args, catchup, etc., see Customizing DAG-Level Configuration.
| Key | Purpose |
|-----|---------|
| blueprint | Template name (required) |
| depends_on | List of upstream step names |
| version | Pin to specific blueprint version |
Everything else passes to the blueprint's config.
YAML supports Jinja2 templating with access to environment variables, Airflow variables/connections, and runtime context:
dag_id: "{{ env.get('ENV', 'dev') }}_pipeline"
schedule: "{{ var.value.schedule | default('@daily') }}"
steps:
extract:
blueprint: extract
output_path: "/data/{{ context.ds_nodash }}/output.csv"
run_id: "{{ context.dag_run.run_id }}"
Available template variables:
env — environment variablesvar — Airflow Variablesconn — Airflow Connectionscontext — proxy that generates Airflow template expressions for runtime macros (e.g. context.ds_nodash, context.dag_run.conf, context.task_instance.xcom_pull(...))By default, Blueprint supports schedule and description as DAG-level YAML fields. To use other DAG constructor arguments (tags, default_args, catchup, etc.), define a BlueprintDagArgs subclass.
tags, default_args, catchup, start_date, or any other DAG kwargs in YAML# dags/templates/my_dag_args.py
from pydantic import BaseModel
from blueprint import BlueprintDagArgs
class MyDagArgsConfig(BaseModel):
schedule: str | None = None
description: str | None = None
tags: list[str] = []
owner: str = "data-team"
retries: int = 2
class MyDagArgs(BlueprintDagArgs[MyDagArgsConfig]):
def render(self, config: MyDagArgsConfig) -> dict[str, Any]:
return {
"schedule": config.schedule,
"description": config.description,
"tags": config.tags,
"default_args": {
"owner": config.owner,
"retries": config.retries,
},
}
Then in YAML, the extra fields are validated by the config model:
dag_id: my_pipeline
schedule: "@daily"
tags: [etl, production]
owner: data-team
retries: 3
steps:
extract:
blueprint: extract
source_table: raw.data
BlueprintDagArgs subclass per project (raises MultipleDagArgsError if more than one exists)render() method returns a dict of kwargs passed to the Airflow DAG() constructorDefaultDagArgs is used (supports only schedule and description)Blueprint config fields can be overridden at DAG trigger time using Airflow params. This enables users to customize behavior when manually triggering DAGs from the Airflow UI.
self.param() in Template FieldsUse self.param("field") in operator template fields to make a config field overridable at runtime:
class ExtractConfig(BaseModel):
query: str = Field(description="SQL query to run")
batch_size: int = Field(default=1000, ge=1)
class Extract(Blueprint[ExtractConfig]):
def render(self, config: ExtractConfig) -> TaskGroup:
with TaskGroup(group_id=self.step_id) as group:
BashOperator(
task_id="run_query",
bash_command=f"run-etl --query {self.param('query')} --batch {self.param('batch_size')}"
)
return group
self.resolve_config() in Python CallablesFor @task or PythonOperator callables, use self.resolve_config() to merge runtime params into config:
class Extract(Blueprint[ExtractConfig]):
def render(self, config: ExtractConfig) -> TaskGroup:
bp = self # capture reference for closure
@task(task_id="run_query")
def run_query(**context):
resolved = bp.resolve_config(config, context)
# resolved.query has the runtime override if one was provided
execute(resolved.query, resolved.batch_size)
with TaskGroup(group_id=self.step_id) as group:
run_query()
return group
step_name__field)ValidationError at execution timeUse on_dag_built to post-process DAGs after they are constructed. This is useful for adding tags, access controls, audit metadata, or any cross-cutting concern.
from pathlib import Path
from blueprint import build_all
def add_audit_tags(dag, yaml_path: Path) -> None:
dag.tags.append("managed-by-blueprint")
dag.tags.append(f"source:{yaml_path.name}")
build_all(on_dag_built=add_audit_tags)
The callback receives:
dag — the constructed Airflow DAG object (mutable)yaml_path — the Path to the YAML file that defined the DAGRun CLI commands with uvx:
uvx --from airflow-blueprint blueprint <command>
| Command | When to Use |
|---------|-------------|
| blueprint list | Show available blueprints |
| blueprint describe <name> | Show config schema for a blueprint |
| blueprint describe <name> -v N | Show schema for specific version |
| blueprint lint | Validate all *.dag.yaml files |
| blueprint lint <path> | Validate specific file |
| blueprint schema <name> | Generate JSON schema |
| blueprint new | Interactive DAG YAML creation |
# Check all YAML files
blueprint lint
# Expected output for valid files:
# PASS customer_pipeline.dag.yaml (dag_id=customer_pipeline)
When user needs to version blueprints for backwards compatibility:
MyBlueprint (no suffix)MyBlueprintV2MyBlueprintV3# v1 - original
class ExtractConfig(BaseModel):
source_table: str
class Extract(Blueprint[ExtractConfig]):
def render(self, config): ...
# v2 - breaking changes, new class
class ExtractV2Config(BaseModel):
sources: list[dict] # Different schema
class ExtractV2(Blueprint[ExtractV2Config]):
def render(self, config): ...
As an alternative to the class name convention, blueprints can set name and version directly:
class MyCustomExtractor(Blueprint[ExtractV3Config]):
name = "extract"
version = 3
def render(self, config): ...
This is useful when the class name doesn't follow the NameV{N} convention or when you want clearer control.
steps:
# Pin to v1
legacy_extract:
blueprint: extract
version: 1
source_table: raw.data
# Use latest (v2)
new_extract:
blueprint: extract
sources: [{table: orders}]
Generate JSON schemas for editor autocompletion or external tooling:
# Generate schema for a blueprint
blueprint schema extract > extract.schema.json
After creating or modifying a blueprint, automatically check if the project is an Astro project by looking for a .astro/ directory (created by astro dev init).
If the project is an Astro project, automatically regenerate schemas without prompting:
mkdir -p blueprint/generated-schemas
# For each name from `blueprint list`: blueprint schema NAME > blueprint/generated-schemas/NAME.schema.json
The Astro IDE reads blueprint/generated-schemas/ to render configuration forms. Keeping schemas in sync ensures the visual builder always reflects the latest blueprint configs.
If you cannot determine whether the project is an Astro project, ask the user once and remember for the rest of the session.
Cause: Blueprint class not in Python path.
Fix: Check template directory or use --template-dir:
blueprint list --template-dir dags/templates/
Cause: YAML field name typo with extra="forbid" enabled.
Fix: Run blueprint describe <name> to see valid field names.
Cause: Missing or broken loader.
Fix: Ensure dags/loader.py exists and calls build_all():
from blueprint import build_all
build_all()
As of v0.2.0, Pydantic validation errors are surfaced as Airflow import errors with actionable messages instead of being silently swallowed. The error message includes details on missing fields, unexpected fields, and type mismatches, along with guidance to run blueprint lint or blueprint describe.
Cause: Circular depends_on references.
Fix: Review step dependencies and remove cycles.
Cause: More than one BlueprintDagArgs subclass discovered in the project.
Fix: Only one BlueprintDagArgs subclass is allowed. Remove or merge duplicates.
Every Blueprint task has extra fields in Rendered Template:
blueprint_step_config - resolved YAML configblueprint_step_code - Python source of blueprintBefore finishing, verify with user:
blueprint list shows their templatesblueprint lint passes for all YAML filesdags/loader.py exists with build_all()tools
Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.
tools
Build Airflow 3.1+ plugins that embed FastAPI apps, custom UI pages, React components, middleware, macros, and operator links directly into the Airflow UI. Use this skill whenever the user wants to create an Airflow plugin, add a custom UI page or nav entry to Airflow, build FastAPI-backed endpoints inside Airflow, serve static assets from a plugin, embed a React app in the Airflow UI, add middleware to the Airflow API server, create custom operator extra links, or call the Airflow REST API from inside a plugin. Also trigger when the user mentions AirflowPlugin, fastapi_apps, external_views, react_apps, plugin registration, or embedding a web app in Airflow 3.1+. If someone is building anything custom inside Airflow 3.1+ that involves Python and a browser-facing interface, this skill almost certainly applies.
data-ai
Use when the user needs human-in-the-loop workflows in Airflow (approval/reject, form input, or human-driven branching). Covers ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Requires Airflow 3.1+. Does not cover AI/LLM calls (see airflow-ai).
development
Detects and fixes common code smells during review or refactoring. Invoke whenever reviewing code for quality issues, before merging a PR, when refactoring legacy code, or when the user asks about code quality, anti-patterns, or technical debt. Detects: over-abstraction, complex inheritance, large functions, tight coupling, hidden dependencies, magic numbers, boolean traps, swallowed exceptions, global state, and duplicate code. Provides specific fixes with before/after examples. Also invoke when someone says "review this code", "is this clean?", "can I improve this?", "this feels messy", or "find problems in my code".