skills/migrate-airflow-kestra/SKILL.md
Migrate an Airflow DAG to a production-ready Kestra flow. Extracts Python task logic into namespace files, maps DAG dependencies to Kestra tasks, and preserves parallel execution structure.
npx skillsauth add kestra-io/agent-skills migrate-airflow-kestraInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Migrate an Apache Airflow DAG file to a production-ready Kestra flow with proper namespace files, correct task ordering, and schema-validated YAML.
Use this skill when the request involves migrating from Apache Airflow to Kestra, including:
.py) to a Kestra flow YAML@task-decorated functions or operators into Kestra tasks.py file ($ARGUMENTS)kestra-migrate/ next to the DAG)company.team)Read the DAG file in full. Extract:
dag_id, schedule, default_args, tags@task-decorated function or operator, note:
task_id)pip packages used)>> operators, set_upstream/set_downstream calls, and direct task invocations into an ordered dependency graphio.kestra.plugin.core.flow.ParalleloutputFiles → inputFiles in Kestracurl -s https://api.kestra.io/v1/plugins/schemas/flow -o /tmp/kestra_schema.json
Extract the definitions needed for this flow:
python3 -c "
needed = [
'io.kestra.plugin.scripts.python.Commands',
'io.kestra.plugin.core.flow.Parallel',
'io.kestra.plugin.core.flow.WorkingDirectory',
'io.kestra.plugin.core.trigger.Schedule',
]
content = open('/tmp/kestra_schema.json').read()
for t in needed:
idx = content.find(f'\"{ t }\"')
if idx >= 0:
print(f'=== {t} ===')
print(content[idx:idx+3000])
"
Do not generate any YAML until schema definitions have been read and validated.
For every task that contains important business logic (see rule below), extract the Python function body into a standalone script under <output-dir>/scripts/.
Namespace file rules:
<task_id>.pyproducts.json), write outputs to local files (e.g. category_stats.json)from airflow …, from pendulum …) — they have no equivalentti.xcom_push / return values with json.dump(result, open("output.json", "w"))ti.xcom_pull with json.load(open("input.json"))When to use a namespace file vs inline:
| Situation | Decision |
|---|---|
| Task has pandas/numpy/ML logic | Namespace file |
| Task makes HTTP requests with processing | Namespace file |
| Task has > 10 lines of logic | Namespace file |
| Task is a single API call or shell command | Inline in flow |
| Task is a BashOperator one-liner | Inline in flow |
Apply all rules below. Write the flow to <output-dir>/flow.yaml.
-) MUST be on the id line, type MUST be the very next lineThis is the single most important formatting rule. Every task block in tasks: (including nested tasks inside Parallel) must follow this exact pattern:
-) goes on the id: line — never on type: or any other property.type: is always the second line, indented to align with id: (no dash).containerImage, namespaceFiles, commands, etc.) follow after type:, at the same indentation level as type:.# ✅ CORRECT — dash on id, type immediately after, then everything else
- id: fetch_products
type: io.kestra.plugin.scripts.python.Commands
containerImage: python:3.11
namespaceFiles:
enabled: true
include:
- scripts/fetch_products.py
commands:
- python scripts/fetch_products.py
# ❌ WRONG — dash on type instead of id
- type: io.kestra.plugin.scripts.python.Commands
id: fetch_products
# ❌ WRONG — dash on containerImage
- containerImage: python:3.11
id: fetch_products
type: io.kestra.plugin.scripts.python.Commands
# ❌ WRONG — dash on namespaceFiles
- namespaceFiles:
enabled: true
id: fetch_products
type: io.kestra.plugin.scripts.python.Commands
# ❌ WRONG — dash on commands
- commands:
- python scripts/fetch_products.py
id: fetch_products
type: io.kestra.plugin.scripts.python.Commands
# ❌ WRONG — dash on dependencies
- dependencies:
- pandas
id: fetch_products
type: io.kestra.plugin.scripts.python.Commands
# ❌ WRONG — type is not the second property
- id: fetch_products
containerImage: python:3.11
type: io.kestra.plugin.scripts.python.Commands
# ❌ WRONG — extra blank line between id and type
- id: fetch_products
type: io.kestra.plugin.scripts.python.Commands
This rule applies everywhere a task appears: top-level tasks:, inside Parallel.tasks:, inside WorkingDirectory.tasks:, and in triggers: blocks. No exceptions.
python:3.11Every Python task (io.kestra.plugin.scripts.python.Commands) must specify:
containerImage: python:3.11
Never omit this property. Never use python:3.13-slim or any other image.
For tasks backed by a namespace file, always declare:
namespaceFiles:
enabled: true
include:
- scripts/<task_id>.py
Reference the script in commands:
commands:
- python scripts/<task_id>.py
Use outputFiles to capture JSON written by a script and store it in Kestra internal storage:
outputFiles:
- result.json
Inject that output into a downstream task via inputFiles:
inputFiles:
result.json: "{{ outputs.<upstream_task_id>.outputFiles['result.json'] }}"
Tasks with a shared upstream and no mutual dependency must be wrapped in io.kestra.plugin.core.flow.Parallel:
- id: compute_analytics
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: compute_category_stats
type: io.kestra.plugin.scripts.python.Commands
containerImage: python:3.11
...
- id: compute_brand_stats
type: io.kestra.plugin.scripts.python.Commands
containerImage: python:3.11
...
Tasks that are independent at the fetch/ingest stage should also run in parallel.
Map Airflow schedule to a Kestra Schedule trigger:
| Airflow | Kestra cron |
|---|---|
| @daily | @daily |
| @hourly | @hourly |
| 0 6 * * * | 0 6 * * * |
| None | Omit trigger entirely |
triggers:
- id: daily
type: io.kestra.plugin.core.trigger.Schedule
cron: "@daily"
| Airflow | Kestra |
|---|---|
| dag_id | id |
| default_args.owner | # comment or label |
| tags | labels |
| description | description |
| catchup=False | No equivalent needed |
| retries | Not set at flow level; handle per-task if needed |
{{ secret('SECRET_NAME') }} Pebble expressions<output-dir>/
├── flow.yaml # Kestra flow
└── scripts/
├── <task_id>.py # One file per task with business logic
└── ...
id: <dag_id>
namespace: <namespace>
description: <dag description>
tasks:
# Stage 1 — parallel fetch
- id: fetch_data
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: <task_id>
type: io.kestra.plugin.scripts.python.Commands
containerImage: python:3.11
namespaceFiles:
enabled: true
include:
- scripts/<task_id>.py
dependencies:
- <pip-package>
commands:
- python scripts/<task_id>.py
outputFiles:
- <output>.json
# Stage 2 — parallel transforms
- id: compute_analytics
type: io.kestra.plugin.core.flow.Parallel
tasks:
- id: <task_id>
type: io.kestra.plugin.scripts.python.Commands
containerImage: python:3.11
namespaceFiles:
enabled: true
include:
- scripts/<task_id>.py
inputFiles:
<input>.json: "{{ outputs.<upstream_id>.outputFiles['<input>.json'] }}"
dependencies:
- <pip-package>
commands:
- python scripts/<task_id>.py
outputFiles:
- <output>.json
# Stage 3 — sequential summary
- id: <final_task_id>
type: io.kestra.plugin.scripts.python.Commands
containerImage: python:3.11
namespaceFiles:
enabled: true
include:
- scripts/<final_task_id>.py
inputFiles:
<a>.json: "{{ outputs.<task_a>.outputFiles['<a>.json'] }}"
<b>.json: "{{ outputs.<task_b>.outputFiles['<b>.json'] }}"
dependencies:
- <pip-package>
commands:
- python scripts/<final_task_id>.py
outputFiles:
- <output>.json
triggers:
- id: schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "<mapped-cron>"
After generating the files, if the user wants to deploy, use the kestra-ops skill. Key note on namespace file uploads: always upload files individually to avoid path nesting issues:
for f in <output-dir>/scripts/*.py; do
name=$(basename "$f")
kestractl nsfiles upload <namespace> "$f" "scripts/$name" --override
done
Do not use directory upload (kestractl nsfiles upload <ns> ./scripts scripts) — it nests the directory name, producing scripts/scripts/<file> instead of scripts/<file>.
dags/ingest_pipeline.py from Airflow to Kestra, output to kestra/"analytics.products namespace"dags/etl.py and deploy it using kestra-ops"tools
Operate Kestra environments using kestractl for context setup, flow inspection, flow validation and deployment, execution monitoring, namespace operations, and namespace file management. Use when users request Kestra operational CLI tasks in dev, staging, or production.
development
Generate, modify, or debug Kestra Flow YAML by fetching the live flow schema and applying the same guardrails used by the Kestra AI Copilot. Use when users ask to create, write, update, or fix a Kestra flow.
development
Maintainer-only workflow for handling GitHub Secret Scanning alerts on OpenClaw. Use when Codex needs to triage, redact, clean up, and resolve secret leakage found in issue comments, issue bodies, PR comments, or other GitHub content.
development
Maintainer workflow for OpenClaw releases, prereleases, changelog release notes, and publish validation. Use when Codex needs to prepare or verify stable or beta release steps, align version naming, assemble release notes, check release auth requirements, or validate publish-time commands and artifacts.