.claude/skills/ts-dagster/SKILL.md
Dagster is a data pipeline orchestrator built around the concept of software-defined assets. Learn to define assets, ops, jobs, schedules, sensors, and resources for building maintainable data platforms.
npx skillsauth add eliferjunior/Claude dagsterInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Dagster organizes data pipelines around software-defined assets — declarations of the data artifacts your pipeline produces. Assets track lineage, enable incremental computation, and integrate with the Dagster UI.
# Install Dagster and UI
pip install dagster dagster-webserver
# Create a new project
dagster project scaffold --name my_pipeline
cd my_pipeline
pip install -e ".[dev]"
# Start the dev server
dagster dev
# UI at http://localhost:3000
# my_pipeline/assets.py: Define assets that produce data
from dagster import asset, AssetExecutionContext
import pandas as pd
@asset(group_name="raw")
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
"""Fetch raw user data from API."""
import httpx
response = httpx.get("https://api.example.com/users")
df = pd.DataFrame(response.json())
context.log.info(f"Fetched {len(df)} users")
return df
@asset(group_name="raw")
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
"""Fetch raw order data from API."""
import httpx
response = httpx.get("https://api.example.com/orders")
return pd.DataFrame(response.json())
@asset(group_name="analytics", deps=[raw_users, raw_orders])
def revenue_by_user(raw_users: pd.DataFrame, raw_orders: pd.DataFrame) -> pd.DataFrame:
"""Calculate total revenue per user."""
merged = raw_orders.merge(raw_users, left_on="user_id", right_on="id")
result = (
merged.groupby(["user_id", "name"])
.agg(total_revenue=("amount", "sum"), order_count=("id_x", "count"))
.reset_index()
)
return result
# my_pipeline/resources.py: Configurable resources for external systems
from dagster import resource, ConfigurableResource
import sqlalchemy
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
engine = sqlalchemy.create_engine(self.connection_string)
with engine.connect() as conn:
result = conn.execute(sqlalchemy.text(sql))
return [dict(row._mapping) for row in result]
def execute(self, sql: str):
engine = sqlalchemy.create_engine(self.connection_string)
with engine.connect() as conn:
conn.execute(sqlalchemy.text(sql))
conn.commit()
# my_pipeline/db_assets.py: Assets that use database resources
from dagster import asset, AssetExecutionContext
from .resources import DatabaseResource
@asset(group_name="warehouse")
def dim_users(context: AssetExecutionContext, database: DatabaseResource):
"""Load cleaned user dimension table into warehouse."""
users = database.query("SELECT id, name, email, created_at FROM raw_users")
context.log.info(f"Loaded {len(users)} users into warehouse")
return users
# my_pipeline/__init__.py: Wire everything together
from dagster import Definitions, load_assets_from_modules
from . import assets, db_assets
from .resources import DatabaseResource
all_assets = load_assets_from_modules([assets, db_assets])
defs = Definitions(
assets=all_assets,
resources={
"database": DatabaseResource(
connection_string="postgresql://user:pass@localhost:5432/analytics"
),
},
)
# my_pipeline/schedules.py: Time-based and event-based triggers
from dagster import (
ScheduleDefinition,
define_asset_job,
sensor,
RunRequest,
SensorEvaluationContext,
AssetSelection,
)
# Job that materializes specific assets
analytics_job = define_asset_job(
name="analytics_job",
selection=AssetSelection.groups("analytics"),
)
# Cron schedule
daily_analytics = ScheduleDefinition(
job=analytics_job,
cron_schedule="0 6 * * *", # 6 AM daily
)
# Sensor — trigger on external event
@sensor(job=analytics_job, minimum_interval_seconds=60)
def new_file_sensor(context: SensorEvaluationContext):
import os
files = os.listdir("/data/incoming")
new_files = [f for f in files if f.endswith(".csv")]
if new_files:
context.log.info(f"Found {len(new_files)} new files")
yield RunRequest(run_key=new_files[0])
# my_pipeline/partitioned.py: Time-partitioned assets for incremental processing
from dagster import asset, DailyPartitionsDefinition
daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context):
"""Fetch events for a specific date partition."""
date = context.partition_key # e.g., "2026-02-19"
context.log.info(f"Processing events for {date}")
# Fetch only this date's data
return fetch_events(date)
# cli.sh: Common Dagster CLI commands
# Development server
dagster dev
# Materialize assets
dagster asset materialize --select raw_users,raw_orders
# List assets
dagster asset list
# Run a job
dagster job execute -j analytics_job
# Check definitions
dagster definitions validate
development
Expert guidance for Fireworks AI, the platform for running open-source LLMs (Llama, Mixtral, Qwen, etc.) with enterprise-grade speed and reliability. Helps developers integrate Fireworks' inference API, fine-tune models, and deploy custom model endpoints with function calling and structured output support.
development
Convert any website into clean, structured data with Firecrawl — API-first web scraping service. Use when someone asks to "turn a website into markdown", "scrape website for LLM", "Firecrawl", "extract website content as clean text", "crawl and convert to structured data", or "scrape website for RAG". Covers single-page scraping, full-site crawling, structured extraction, and LLM-ready output.
tools
Expert guidance for Firebase, Google's platform for building and scaling web and mobile applications. Helps developers set up authentication, Firestore/Realtime Database, Cloud Functions, hosting, storage, and analytics using Firebase's SDK and CLI.
development
When the user needs to build file upload functionality for a web application. Use when the user mentions "file upload," "image upload," "upload endpoint," "multipart upload," "presigned URL," "S3 upload," "file validation," "upload to cloud storage," or "accept user files." Handles upload endpoints, file validation (type, size, magic bytes), cloud storage integration, and upload status tracking. For image/video processing after upload, see media-transcoder.