skills/migrating-ai-sdk-to-common-ai/SKILL.md
Migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai 0.1.0+. Use this skill when the user wants to replace airflow-ai-sdk with the official Airflow AI provider, migrate LLM decorators (@task.llm, @task.agent, @task.llm_branch, @task.embed), switch from model strings/objects to connection-based LLM configuration, or update imports from airflow_ai_sdk to the new provider. Also trigger when the user mentions common-ai provider, AIP-99, pydanticai connection, or migrating away from airflow-ai-sdk.
npx skillsauth add astronomer/agents migrating-ai-sdk-to-common-aiInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
This skill migrates Airflow projects from airflow-ai-sdk to apache-airflow-providers-common-ai (0.1.0+), the official Airflow AI provider built on PydanticAI.
CRITICAL: The new provider requires Airflow 3.0+ and pydantic-ai-slim >= 1.34.0. The API surface has changed: LLM configuration moves from code (model strings/objects) to Airflow connections (
pydanticaitype). There is no@task.embedin the new provider.
Use the Grep tool with the pattern below to inventory everything that needs to migrate:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed
From the results, capture:
airflow-ai-sdk / airflow_ai_sdk@task.llm, @task.agent, @task.llm_branch, @task.embed"gpt-5", or OpenAIModel(...) objects)airflow_ai_sdk.BaseModel subclasses used as output_typeUse this inventory to drive the steps below.
Remove:
airflow-ai-sdk[openai]
# or any variant: airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.
Add:
apache-airflow-providers-common-ai[openai]>=0.1.0
Use the latest available 0.x version unless the user has pinned a specific one. Available extras match the LLM provider: [openai], [anthropic], [google], [bedrock], [groq], [mistral], [mcp].
Keep sentence-transformers and torch if the project uses embeddings (they now run via plain @task instead of @task.embed).
The new provider uses an Airflow connection instead of model strings or objects in code.
Connection type: pydanticai
Default connection ID: pydanticai_default
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
"conn_type": "pydanticai",
"password": "<api-key>",
"extra": {
"model": "<provider>:<model-name>"
}
}'
The model field uses provider:model format:
| Provider | Example model value |
|----------|-------------------|
| OpenAI | openai:gpt-5 |
| Anthropic | anthropic:claude-sonnet-4-20250514 |
| Google | google:gemini-2.5-pro |
| Groq | groq:llama-3.3-70b-versatile |
| Mistral | mistral:mistral-large-latest |
| Bedrock | bedrock:us.anthropic.claude-sonnet-4-20250514-v1:0 |
Set host to the base URL:
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
"conn_type": "pydanticai",
"password": "<api-key>",
"host": "https://my-endpoint.com/v1",
"extra": {
"model": "openai:<model-name>"
}
}'
Use the openai: prefix for any OpenAI-compatible API, regardless of the actual provider.
The env var name determines the connection ID:
AIRFLOW_CONN_PYDANTICAI_DEFAULT creates pydanticai_defaultAIRFLOW_CONN_PYDANTICAI_CORTEX creates pydanticai_cortexmodel_id parameter on the decorator/operator (highest)model in connection's extra JSON (fallback)# BEFORE (airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel):
field: str
@task.llm(
model="gpt-5", # or model=OpenAIModel(...)
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
# AFTER (apache-airflow-providers-common-ai)
from pydantic import BaseModel
class MyOutput(BaseModel):
field: str
@task.llm(
llm_conn_id="pydanticai_default", # Airflow connection ID
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
Parameter mapping:
| airflow-ai-sdk | common-ai provider | Notes |
|----------------|-------------------|-------|
| model="gpt-5" | llm_conn_id="pydanticai_default" | Model specified in connection |
| model=OpenAIModel(...) | llm_conn_id="pydanticai_default" | Model + endpoint in connection |
| system_prompt="..." | system_prompt="..." | Unchanged |
| output_type=MyModel | output_type=MyModel | Unchanged |
| result_type=MyModel | output_type=MyModel | result_type was already deprecated |
| (not available) | model_id="openai:gpt-5" | Override connection's model |
| (not available) | require_approval=True | Built-in HITL review |
| (not available) | agent_params={...} | Extra kwargs for pydantic-ai Agent |
# BEFORE
@task.llm_branch(
model="gpt-5",
system_prompt="Choose a team...",
allow_multiple_branches=False,
)
def route(text: str) -> str:
return text
# AFTER
@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Choose a team...",
allow_multiple_branches=False, # same parameter, unchanged
)
def route(text: str) -> str:
return text
Only change: model= becomes llm_conn_id=.
This has the biggest API change. The Agent is no longer pre-built in user code.
# BEFORE (airflow-ai-sdk) - Agent built at module level
from pydantic_ai import Agent
my_agent = Agent(
"gpt-5",
system_prompt="You are a research assistant.",
tools=[search_tool, lookup_tool],
)
@task.agent(agent=my_agent)
def research(question: str) -> str:
return question
# AFTER (common-ai provider) - No Agent object, config via parameters
@task.agent(
llm_conn_id="pydanticai_default",
system_prompt="You are a research assistant.",
agent_params={"tools": [search_tool, lookup_tool]},
)
def research(question: str) -> str:
return question
Parameter mapping:
| airflow-ai-sdk | common-ai provider | Notes |
|----------------|-------------------|-------|
| agent=Agent(model, ...) | llm_conn_id="..." | Model from connection |
| Agent's system_prompt | system_prompt="..." | Now a decorator param |
| Agent's tools=[...] | agent_params={"tools": [...]} | Tools via agent_params dict |
| Agent's output_type | output_type=MyModel | Now a decorator param |
| (not available) | toolsets=[...] | pydantic-ai 1.x Toolset objects |
| (not available) | durable=True | Step-level caching |
| (not available) | enable_hitl_review=True | Iterative human review loop |
Key insight: Everything that was configured on the Agent() constructor now goes into either a top-level decorator parameter or agent_params. The agent_params dict is passed directly to pydantic-ai's Agent constructor.
The new provider does NOT include an embed decorator. Replace with a plain @task:
# BEFORE (airflow-ai-sdk)
@task.embed(
model_name="all-MiniLM-L6-v2",
encode_kwargs={"normalize_embeddings": True},
max_active_tis_per_dagrun=1,
)
def embed_text(text: str) -> str:
return text
# AFTER (plain @task with sentence-transformers)
@task(max_active_tis_per_dagrun=1)
def embed_text(text: str) -> list[float]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(text, normalize_embeddings=True).tolist()
Note: The model is loaded on each task execution. For small workloads this is fine. For large batches, consider embedding all texts in a single task instead of using .expand().
| Old import | New import |
|-----------|-----------|
| import airflow_ai_sdk as ai_sdk | Remove entirely |
| from airflow_ai_sdk import BaseModel | from pydantic import BaseModel |
| from airflow_ai_sdk.models.base import BaseModel | from pydantic import BaseModel |
| class Foo(ai_sdk.BaseModel): | class Foo(BaseModel): |
| from pydantic_ai import Agent | Remove if Agent was only used for @task.agent |
| from pydantic_ai.models.openai import OpenAIModel | Remove (model config in connection now) |
The @task.llm, @task.agent, @task.llm_branch decorators are auto-registered by the provider. No explicit import needed beyond from airflow.sdk import task.
pydantic_ai imports for non-decorator usage (e.g., BinaryContent for multimodal) are still valid since the new provider depends on pydantic-ai-slim>=1.34.0.
pydanticai_default:
conn_type: pydanticai
password: <api-key>
extra:
model: "openai:gpt-5"
For custom endpoints:
pydanticai_cortex:
conn_type: pydanticai
password: <api-key>
host: https://my-endpoint.com/v1
extra:
model: "openai:llama3.1-8b"
The new provider reads model config from the pydanticai connection, so env vars that previously fed the model in code are usually redundant. Before removing any of them, grep the project (and any sibling scripts/services) to confirm nothing else still references them:
OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY
Candidates for removal only if no other code references them:
OPENAI_API_KEY (now in the pydanticai connection's password field)OPENAI_BASE_URL (now in the connection's host field)If anything outside the migrated DAGs still uses them (other DAGs not yet migrated, helper scripts, non-Airflow services sharing the .env), leave them in place.
Keep AIRFLOW_CONN_* env vars for all connections.
After migration, grep the codebase to confirm no stale references remain:
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models
Verify:
airflow_ai_sdkAgent() objects created for @task.agent (unless used outside decorators)model= parameter on LLM decorators (should be llm_conn_id=)@task.embed replaced with plain @taskpydanticai connection configured in .env or connections.yamlrequirements.txt has apache-airflow-providers-common-ai[...] instead of airflow-ai-sdk[...]These features are available after migration but have no airflow-ai-sdk equivalent:
| Feature | Parameter | Description |
|---------|-----------|-------------|
| HITL approval | require_approval=True on @task.llm | Pause for human review before returning |
| HITL review loop | enable_hitl_review=True on @task.agent | Iterative review with regeneration |
| Durable execution | durable=True on @task.agent | Step-level caching for resilience |
| Tool logging | enable_tool_logging=True on @task.agent | INFO-level tool call logs (default: on) |
| Model override | model_id="openai:gpt-5" | Override connection's model per-task |
| File analysis | @task.llm_file_analysis | Analyze files/images via ObjectStoragePath |
| NL-to-SQL | @task.llm_sql | Generate SQL from natural language |
tools
Drives Astronomer's Otto agent (`astro otto`) as a delegated sub-agent for Airflow, dbt, and data-engineering work. Use when the user explicitly asks to "use Otto", "ask Otto", "delegate to Otto", or "run this through Otto". Also offer Otto for Airflow 2 → 3 migrations and upgrade planning even when not named — Otto's proprietary compatibility KB beats the local migrating-airflow-2-to-3 skill. Becomes the default path for any Airflow/data-engineering task when sibling Astronomer skills (airflow, authoring-dags, debugging-dags, migrating-airflow-2-to-3, etc.) are NOT loaded in the current session. Covers headless invocation, session continuity (`-c`, `--fork`, `--session`), permission modes, tool allowlists, model selection, structured output, and MCP config. **Do not load this skill if you are Otto** — Otto must not delegate to itself.
testing
Initialize and configure Astro/Airflow projects. Use when the user wants to create a new project, set up dependencies, configure connections/variables, or understand project structure. For running the local environment, see managing-astro-local-env.
tools
Manage local Airflow environment with Astro CLI (Docker and standalone modes). Use when the user wants to start, stop, or restart Airflow, view logs, query the Airflow API, troubleshoot, or fix environment issues. For project setup, see setting-up-astro-project.
tools
Queries, manages, and troubleshoots Apache Airflow using the af CLI. Covers listing DAGs, triggering runs, reading task logs, diagnosing failures, debugging DAG import errors, checking connections, variables, pools, and monitoring health. Also routes to sub-skills for writing DAGs, debugging, deploying, and migrating Airflow 2 to 3. Use when user mentions "Airflow", "DAG", "DAG run", "task log", "import error", "parse error", "broken DAG", or asks to "trigger a pipeline", "debug import errors", "check Airflow health", "list connections", "retry a run", or any Airflow operation. Do NOT use for warehouse/SQL analytics on Airflow metadata tables — use analyzing-data instead.