skills/cosmos-dbt-core/SKILL.md
Use when turning a dbt Core project into an Airflow DAG/TaskGroup using Astronomer Cosmos. Does not cover dbt Fusion. Before implementing, verify dbt engine, warehouse, Airflow version, execution environment, DAG vs TaskGroup, and manifest availability.
npx skillsauth add astronomer/agents cosmos-dbt-coreInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Execute steps in order. Prefer the simplest configuration that meets the user's constraints.
Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).
Reference: Latest stable: https://pypi.org/project/astronomer-cosmos/
Before starting, confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs individual operators, (6) manifest availability.
| Approach | When to use | Required param |
|----------|-------------|----------------|
| Project path | Files available locally | dbt_project_path |
| Manifest only | dbt_manifest load | manifest_path + project_name |
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# project_name="my_project", # if using manifest_path without dbt_project_path
# install_dbt_deps=False, # if deps precomputed in CI
)
Pick ONE load mode based on constraints:
| Load mode | When to use | Required inputs | Constraints |
|-----------|-------------|-----------------|-------------|
| dbt_manifest | Large projects; containerized execution; fastest | ProjectConfig.manifest_path | Remote manifest needs manifest_conn_id |
| dbt_ls | Complex selectors; need dbt-native selection | dbt installed OR dbt_executable_path | Can also be used with containerized execution |
| dbt_ls_file | dbt_ls selection without running dbt_ls every parse | RenderConfig.dbt_ls_path | select/exclude won't work |
| automatic (default) | Simple setups; let Cosmos pick | (none) | Falls back: manifest → dbt_ls → custom |
CRITICAL: Containerized execution (
DOCKER/KUBERNETES/etc.)
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.DBT_MANIFEST, # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)
Reference: See reference/cosmos-config.md for detailed configuration examples per mode.
Pick ONE execution mode:
| Execution mode | When to use | Speed | Required setup |
|----------------|-------------|-------|----------------|
| WATCHER | Fastest; single dbt build visibility | Fastest | dbt adapter in env OR dbt_executable_path or dbt Fusion |
| WATCHER_KUBERNETES | Fastest isolated method; single dbt build visibility | Fast | dbt installed in container |
| LOCAL + DBT_RUNNER | dbt + adapter in the same Python installation as Airflow | Fast | dbt 1.5+ in requirements.txt |
| LOCAL + SUBPROCESS | dbt + adapter available in the Airflow deployment, in an isolated Python installation | Medium | dbt_executable_path |
| AIRFLOW_ASYNC | BigQuery + long-running transforms | Fast | Airflow ≥2.8; provider deps |
| KUBERNETES | Isolation between Airflow and dbt | Medium | Airflow ≥2.8; provider deps |
| VIRTUALENV | Can't modify image; runtime venv | Slower | py_requirements in operator_args |
| Other containerized approaches | Support Airflow and dbt isolation | Medium | container config |
from cosmos import ExecutionConfig, ExecutionMode
_execution_config = ExecutionConfig(
execution_mode=ExecutionMode.WATCHER, # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)
Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
profile_args={"schema": "my_schema"},
),
)
CRITICAL: Do not hardcode secrets; use environment variables.
from cosmos import ProfileConfig
_profile_config = ProfileConfig(
profile_name="my_profile",
target_name="dev",
profiles_yml_filepath="/path/to/profiles.yml",
)
Reference: See reference/cosmos-config.md for detailed testing options.
| TestBehavior | Behavior |
|--------------|----------|
| AFTER_EACH (default) | Tests run immediately after each model (default) |
| BUILD | Combine run + test into single dbt build |
| AFTER_ALL | All tests after all models complete |
| NONE | Skip tests |
from cosmos import RenderConfig, TestBehavior
_render_config = RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
)
Reference: See reference/cosmos-config.md for detailed operator_args options.
_operator_args = {
# BaseOperator params
"retries": 3,
# Cosmos-specific params
"install_deps": False,
"full_refresh": False,
"quiet": True,
# Runtime dbt vars (XCom / params)
"vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
my_cosmos_dag = DbtDag(
dag_id="my_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
operator_args={},
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
import os
from datetime import datetime
from pathlib import Path
from typing import Any
from airflow import DAG
try:
from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
from airflow.operators.python import PythonOperator
from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profiles_yml_filepath=DBT_PROFILE_PATH,
)
def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
"""Check if a file exists in the given S3 bucket."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
print(f"Checking if file {s3_key} exists in S3 bucket...")
hook = S3Hook(aws_conn_id=aws_conn_id)
return hook.check_for_key(key=s3_key, bucket_name=bucket_name)
with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
check_file_uploaded_task = PythonOperator(
task_id="check_file_uploaded_task",
python_callable=check_s3_file,
op_kwargs={
"aws_conn_id": "aws_s3_conn",
"bucket_name": "cosmos-artifacts-upload",
"file_key": "target/run_results.json",
},
)
run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
clone_operator = DbtCloneLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="clone",
dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
install_deps=True,
append_env=True,
)
seed_operator >> run_operator >> clone_operator
seed_operator >> check_file_uploaded_task
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain
with DbtDag(...) as dag:
@task
def upstream_task():
pass
_upstream = upstream_task()
for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
if dbt_node.resource_type == DbtResourceType.SEED:
my_dbt_task = dag.tasks_map[unique_id]
chain(_upstream, my_dbt_task)
Before finalizing, verify:
| Airflow 3.x | Airflow 2.x |
|-------------|-------------|
| from airflow.sdk import dag, task | from airflow.decorators import dag, task |
| from airflow.sdk import chain | from airflow.models.baseoperator import chain |
Cosmos ≤1.9 (Airflow 2 Datasets):
postgres://0.0.0.0:5434/postgres.public.orders
Cosmos ≥1.10 (Airflow 3 Assets):
postgres://0.0.0.0:5434/postgres/public/orders
CRITICAL: Update asset URIs when upgrading to Airflow 3.
Cosmos caches artifacts to speed up parsing. Enabled by default.
Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
When enabled:
from cosmos.airflow.dag import DbtDag # instead of: from cosmos import DbtDag
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage
my_dag = DbtDag(
# ...
operator_args={"callback": upload_to_cloud_storage},
)
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
"my_project": {
"dir": "s3://bucket/docs/",
"index": "index.html",
"conn_id": "aws_default",
"name": "My Project"
}
}'
Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html
tools
Drives Astronomer's Otto agent (`astro otto`) as a delegated sub-agent for Airflow, dbt, and data-engineering work. Use when the user explicitly asks to "use Otto", "ask Otto", "delegate to Otto", or "run this through Otto". Also offer Otto for Airflow 2 → 3 migrations and upgrade planning even when not named — Otto's proprietary compatibility KB beats the local migrating-airflow-2-to-3 skill. Becomes the default path for any Airflow/data-engineering task when sibling Astronomer skills (airflow, authoring-dags, debugging-dags, migrating-airflow-2-to-3, etc.) are NOT loaded in the current session. Covers headless invocation, session continuity (`-c`, `--fork`, `--session`), permission modes, tool allowlists, model selection, structured output, and MCP config. **Do not load this skill if you are Otto** — Otto must not delegate to itself.
testing
Initialize and configure Astro/Airflow projects. Use when the user wants to create a new project, set up dependencies, configure connections/variables, or understand project structure. For running the local environment, see managing-astro-local-env.
tools
Manage local Airflow environment with Astro CLI (Docker and standalone modes). Use when the user wants to start, stop, or restart Airflow, view logs, query the Airflow API, troubleshoot, or fix environment issues. For project setup, see setting-up-astro-project.
tools
Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.