engineering/ai-ml-engineering/skills/data-pipeline-engineering/SKILL.md
This skill should be used when the user asks about "data pipeline", "ETL", "ELT", "data engineering", "Apache Airflow", "Prefect", "Dagster", "dbt", "Spark", "Kafka", "streaming pipeline", "batch pipeline", "data ingestion", "data transformation", "data quality", "data validation", "Great Expectations", "feature store", "feature engineering", "feature pipeline", "data lake", "data warehouse", "medallion architecture", "bronze silver gold", "Parquet", "Iceberg", "Delta Lake", "dbt model", "pipeline orchestration", "data lineage", "backfill", "incremental processing", "change data capture", "CDC", or "real-time data". Also trigger for "my pipeline is slow", "data is duplicated", "pipeline is failing", or "how do I build a data pipeline for ML".
npx skillsauth add harsh040506/claude-code-unified-skill-plugin-library data-pipeline-engineeringInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Design, build, and operate reliable data pipelines for ML and analytics workloads.
| Concern | Batch | Streaming | |---------|-------|-----------| | Latency requirement | Hours to days | Seconds to minutes | | Data volume | Any | Very high throughput | | Complexity | Lower | Higher | | Cost | Lower | Higher | | Use cases | Daily ML training, reports | Fraud detection, recommendations |
Start with batch. Only move to streaming if latency requirements genuinely demand it.
Raw data (landing) → Bronze (raw ingest) → Silver (cleaned/validated) → Gold (aggregated/features)
| Layer | Contents | Transformation | |-------|---------|---------------| | Bronze | Raw data, unchanged. Schema-on-read. | Ingest only: add metadata (source, ingest timestamp) | | Silver | Typed, deduplicated, validated. | Apply schema, cast types, remove invalid rows | | Gold | Aggregated, business-friendly. | Joins, aggregations, feature computation |
Store all layers as Parquet (or Delta/Iceberg for ACID transactions). Always preserve Bronze — it's your source of truth for reprocessing.
Never trust data. Validate at every layer boundary.
import great_expectations as gx
context = gx.get_context()
datasource = context.sources.add_pandas("my_source")
asset = datasource.add_dataframe_asset("orders")
batch_request = asset.build_batch_request(dataframe=df)
# Define expectations
suite = context.add_expectation_suite("orders_suite")
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_suite",
)
# Column existence
validator.expect_column_to_exist("order_id")
validator.expect_column_to_exist("customer_id")
validator.expect_column_to_exist("amount")
# No nulls in critical columns
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_not_be_null("customer_id")
# Range validation
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=100_000)
# Uniqueness
validator.expect_column_values_to_be_unique("order_id")
# Enum validation
validator.expect_column_values_to_be_in_set(
"status", ["pending", "completed", "cancelled", "refunded"]
)
# Date format
validator.expect_column_values_to_match_strftime_format("created_at", "%Y-%m-%dT%H:%M:%S")
# Save and validate
validator.save_expectation_suite()
results = validator.validate()
if not results.success:
failed_expectations = [r for r in results.results if not r.success]
raise ValueError(f"Data quality check failed: {len(failed_expectations)} expectations failed")
import pandas as pd
from dataclasses import dataclass
@dataclass
class DataQualityResult:
passed: bool
issues: list[str]
def validate_orders(df: pd.DataFrame) -> DataQualityResult:
issues = []
# Required columns
required = ["order_id", "customer_id", "amount", "created_at"]
missing = [col for col in required if col not in df.columns]
if missing:
issues.append(f"Missing columns: {missing}")
# Null checks
null_counts = df[required].isnull().sum()
for col, count in null_counts.items():
if count > 0:
issues.append(f"NULL values in '{col}': {count} rows ({count/len(df):.1%})")
# Range checks
if "amount" in df.columns:
invalid = df["amount"] < 0
if invalid.any():
issues.append(f"Negative amounts: {invalid.sum()} rows")
# Duplicate primary keys
if "order_id" in df.columns:
dups = df["order_id"].duplicated()
if dups.any():
issues.append(f"Duplicate order_ids: {dups.sum()}")
return DataQualityResult(passed=len(issues) == 0, issues=issues)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
"owner": "data-engineering",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"email_on_failure": True,
"email": ["[email protected]"],
}
with DAG(
dag_id="ml_feature_pipeline",
description="Daily feature engineering pipeline for churn model",
schedule="0 4 * * *", # 4 AM UTC daily
start_date=datetime(2026, 1, 1),
catchup=False, # Don't backfill historical runs
max_active_runs=1, # Only one run at a time
default_args=default_args,
tags=["ml", "features", "churn"],
) as dag:
start = EmptyOperator(task_id="start")
extract_orders = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders_from_postgres,
op_kwargs={"date": "{{ ds }}"}, # Airflow macro for execution date
pool="postgres_pool", # Limit concurrent DB connections
)
extract_events = PythonOperator(
task_id="extract_events",
python_callable=extract_events_from_bigquery,
op_kwargs={"date": "{{ ds }}"},
)
validate_data = PythonOperator(
task_id="validate_data",
python_callable=run_data_quality_checks,
op_kwargs={"date": "{{ ds }}"},
)
compute_features = PythonOperator(
task_id="compute_features",
python_callable=compute_churn_features,
op_kwargs={"date": "{{ ds }}"},
)
load_feature_store = PythonOperator(
task_id="load_feature_store",
python_callable=write_to_feature_store,
op_kwargs={"date": "{{ ds }}"},
)
notify_success = PythonOperator(
task_id="notify_success",
python_callable=send_slack_notification,
op_kwargs={"status": "success"},
trigger_rule=TriggerRule.ALL_SUCCESS,
)
notify_failure = PythonOperator(
task_id="notify_failure",
python_callable=send_slack_notification,
op_kwargs={"status": "failure"},
trigger_rule=TriggerRule.ONE_FAILED,
)
# DAG dependencies
start >> [extract_orders, extract_events] >> validate_data
validate_data >> compute_features >> load_feature_store
load_feature_store >> [notify_success, notify_failure]
All pipeline tasks must be idempotent — running the same task twice with the same inputs produces the same result.
def load_to_warehouse(date: str, df: pd.DataFrame):
"""Idempotent load: delete existing data for the date, then insert."""
with engine.begin() as conn:
# Delete existing data for this date partition
conn.execute(
text("DELETE FROM features WHERE date = :date"),
{"date": date}
)
# Insert fresh data
df.to_sql("features", conn, if_exists="append", index=False)
| Feature type | Raw form | Encoding | |-------------|---------|---------| | Categorical (low cardinality) | "gold", "silver", "bronze" | One-hot encoding | | Categorical (high cardinality) | user_id, product_id | Embedding or target encoding | | Numeric | age, price | Standard scaling or quantile normalization | | Text | review text, description | TF-IDF, embeddings | | Datetime | 2026-03-01 09:14:00 | Hour, day-of-week, is_weekend, recency | | Geospatial | lat/lon | Geohash buckets, distance from reference point |
The most common ML engineering bug. Target leakage happens when features contain information from the future.
# WRONG — uses order total to predict churn, but total isn't known at prediction time
features["total_spend_this_month"] = df.groupby("customer_id")["amount"].transform("sum")
# CORRECT — use only data available at decision time (last month's spend)
features["total_spend_last_month"] = df[df["order_date"] < cutoff_date].groupby("customer_id")["amount"].sum()
Rule: For churn/conversion/retention models, features must only use data from before the label period starts.
from datetime import datetime
import pandas as pd
class FeatureStore:
"""Simple feature store interface."""
def write_features(self, entity: str, features: pd.DataFrame,
as_of_date: datetime):
"""Write features for an entity at a specific point in time."""
features["entity"] = entity
features["as_of_date"] = as_of_date
features.to_parquet(f"features/{entity}/{as_of_date.date()}.parquet")
def get_features(self, entity: str, feature_names: list[str],
as_of_date: datetime) -> pd.DataFrame:
"""Get historical feature values (point-in-time correct)."""
# Find the most recent feature snapshot before as_of_date
# This is the core operation that prevents leakage in training
...
When you genuinely need real-time (< 1 minute latency):
from confluent_kafka import Consumer, Producer
import json
# Consumer
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "ml-feature-pipeline",
"auto.offset.reset": "latest",
"enable.auto.commit": False, # Manual commit — ensure at-least-once processing
})
consumer.subscribe(["orders"])
# Processing loop
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
handle_error(msg.error())
continue
order = json.loads(msg.value())
try:
features = compute_realtime_features(order)
write_to_feature_store(features)
consumer.commit(msg) # Commit only after successful processing
except Exception as e:
logger.error(f"Failed to process message: {e}", exc_info=True)
# Send to dead letter queue, don't commit
send_to_dlq(msg)
Streaming anti-patterns to avoid:
For production-ready pipeline templates and streaming infrastructure recipes, see:
references/pipeline-patterns.md — complete Airflow DAG patterns, Kafka consumer group configurations, and Spark/dbt pipeline templates with error handling and observabilitytesting
Performs quality control on single-cell RNA-seq data (.h5ad or .h5 files) using scverse best practices with MAD-based filtering and comprehensive visualizations. Use when users request QC analysis, filtering low-quality cells, assessing data quality, or following scverse/scanpy best practices for single-cell analysis.
tools
Deep learning for single-cell analysis using scvi-tools. This skill should be used when users need (1) data integration and batch correction with scVI/scANVI, (2) ATAC-seq analysis with PeakVI, (3) CITE-seq multi-modal analysis with totalVI, (4) multiome RNA+ATAC analysis with MultiVI, (5) spatial transcriptomics deconvolution with DestVI, (6) label transfer and reference mapping with scANVI/scArches, (7) RNA velocity with veloVI, or (8) any deep learning-based single-cell method. Triggers include mentions of scVI, scANVI, totalVI, PeakVI, MultiVI, DestVI, veloVI, sysVI, scArches, variational autoencoder, VAE, batch correction, data integration, multi-modal, CITE-seq, multiome, reference mapping, latent space.
testing
This skill should be used when scientists need help with research problem selection, project ideation, troubleshooting stuck projects, or strategic scientific decisions. Use this skill when users ask to pitch a new research idea, work through a project problem, evaluate project risks, plan research strategy, navigate decision trees, or get help choosing what scientific problem to work on. Typical requests include "I have an idea for a project", "I'm stuck on my research", "help me evaluate this project", "what should I work on", or "I need strategic advice about my research".
development
Run nf-core bioinformatics pipelines (rnaseq, sarek, atacseq) on sequencing data. Use when analyzing RNA-seq, WGS/WES, or ATAC-seq data—either local FASTQs or public datasets from GEO/SRA. Triggers on nf-core, Nextflow, FASTQ analysis, variant calling, gene expression, differential expression, GEO reanalysis, GSE/GSM/SRR accessions, or samplesheet creation.