plugins/ml-master/skills/ml-data-pipeline/SKILL.md
This skill should be used when the user asks to ingest, clean, validate, transform, version, monitor, or serve ML data and features. PROACTIVELY activate for: (1) data ingestion, preprocessing, feature engineering, leakage prevention, train/serving skew, (2) Spark, Dask, Polars, pandas, Ray Data, streaming pipelines, (3) Great Expectations, TFDV, Deequ, data quality and validation, (4) DVC, lakehouse tables, dataset versioning, lineage, reproducibility, (5) Feast, Tecton, Hopsworks feature stores, point-in-time joins, online/offline features. Provides: scalable, reproducible, leakage-safe ML data pipeline design.
npx skillsauth add JosiahSiegel/claude-plugin-marketplace ml-data-pipelineInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Use this skill for data ingestion, validation, preprocessing, feature engineering, dataset versioning, feature stores, batch and streaming pipelines, and data-quality monitoring. In ML, data pipeline correctness is often more important than model sophistication. A pipeline must produce leakage-safe training data and consistent serving features.
Choose storage based on data shape and access pattern. Object storage with Parquet/Arrow is a strong default for tabular batch ML. Delta Lake, Apache Iceberg, or Hudi add ACID tables, schema evolution, and time travel. Use warehouses for governed SQL features, vector stores for embedding retrieval, and streaming logs for online behavior. Store raw, cleaned, feature, and model-ready layers separately. For Azure Storage pointer blobs used by ADF to pass Azure ML code asset versions, load ml-azureml-adf-automation.
For large datasets, prefer columnar formats, partitioning by time or high-level domain, compression, predicate pushdown, and manifest files. Avoid many tiny files; compact when necessary. Record dataset snapshot identifiers in every training run.
Feast is a modular feature store for maintaining online-offline feature consistency.
feature_store.yaml)project: fraud_detection
registry: data/registry.db
provider: local
online_store:
type: redis
connection_string: "localhost:6379"
offline_store:
type: file
features.py)from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource, ValueType
from feast.types import Float32, Int64
user = Entity(name="user_id", value_type=ValueType.INT64, join_keys=["user_id"])
user_transactions_source = FileSource(
path="data/user_transactions.parquet",
event_timestamp_column="timestamp",
created_timestamp_column="created_timestamp",
)
user_transactions_fv = FeatureView(
name="user_transactions_feature_view",
entities=[user],
ttl=timedelta(days=90),
schema=[
Field(name="transaction_count_30d", dtype=Int64),
Field(name="total_amount_30d", dtype=Float32),
],
online=True,
source=user_transactions_source,
)
| Engine | Best fit | |---|---| | pandas | Small to medium in-memory exploration and simple pipelines | | Polars | Fast local/lazy columnar processing, larger-than-pandas workloads | | Spark | Large distributed ETL, lakehouse workflows, SQL + MLlib integration | | Dask | Python-native distributed arrays/dataframes and custom workloads | | Ray Data | ML-centric distributed preprocessing integrated with Ray Train/Tune/Serve | | Beam/Flink/Spark Streaming | Streaming or unified batch/stream dataflows | | Airflow/Prefect/Dagster | Orchestration, scheduling, retries, lineage, and dependency management |
Polars lazy evaluation optimizes the execution plan using predicate and projection pushdowns.
import polars as pl
def compute_rolling_user_features(transactions_path: str):
lazy_df = (
pl.scan_parquet(transactions_path)
# Cast timestamp for windowing
.with_columns(pl.col("timestamp").str.strptime(pl.Datetime))
.sort("timestamp")
# Define rolling calculation window
.group_by_dynamic(
index_column="timestamp",
every="1d",
period="30d",
group_by="user_id"
)
.agg([
pl.col("amount").count().alias("transaction_count_30d"),
pl.col("amount").sum().alias("total_amount_30d"),
pl.col("amount").mean().alias("avg_amount_30d")
])
.filter(pl.col("user_id").is_not_null())
)
# Collect executes the query optimization and loads results into memory
return lazy_df.collect()
Prevent typical distributed training bottlenecks such as data skew and excessive shuffle overhead.
from pyspark.sql import functions as F
# Adding a salt column to distribute skewed key values evenly across partitions
skewed_df = skewed_df.with_columns(
(F.rand() * 10).cast("int").alias("salt")
)
lookup_df = lookup_df.with_columns(
F.explode(F.array([F.lit(i) for i in range(10)])).alias("salt")
)
# Join on key AND salt to distribute the workload
joined_df = skewed_df.join(lookup_df, ["join_key", "salt"], "inner").drop("salt")
from pyspark.sql.functions import broadcast
# Explicitly broadcast small dimension dataframe to executors to avoid shuffling large fact table
optimized_joined = large_fact_df.join(broadcast(small_lookup_df), "entity_id", "inner")
Validate at ingestion, feature generation, training, and serving. Check schema, types, ranges, nulls, uniqueness, duplicates, categorical domains, cardinality, label distribution, timestamp monotonicity, referential integrity, text/image/audio validity, and embedding norms. Tools include Great Expectations, TensorFlow Data Validation, Deequ, pandera, dbt tests, and custom assertions.
Quality checks should fail fast for contract violations and warn for distribution changes that need investigation. Store validation reports with training artifacts. For production, monitor both raw features and post-transform model inputs.
Feature engineering should be tied to the prediction time. For temporal data, compute rolling windows with correct cutoffs, delays, and late-arriving data handling. For target encoding, fit encoders inside cross-validation folds and use smoothing. For categorical features, choose native categorical support, one-hot, hashing, embeddings, or target encoding based on cardinality and model type. For text, version tokenizers and vocabularies. For images/audio, store preprocessing parameters and augmentations.
Prevent leakage by asking: would this feature be known at the moment the model makes the prediction? If not, exclude it or redesign the target and prediction time.
DVC tags large datasets to Git commits via lightweight metadata files, avoiding bloat.
dvc init
dvc remote add -d myremote s3://my-dvc-bucket/raw-data
# Add dataset to DVC tracking (creates data.parquet.dvc)
dvc add data/raw_transactions.parquet
# Commit DVC metadata file to Git
git add data/raw_transactions.parquet.dvc data/.gitignore
git commit -m "Track transactions dataset v1.0.0 via DVC"
# Push raw binaries to remote cloud storage
dvc push
git pull
dvc pull
Streaming ML pipelines need event-time handling, watermarks, deduplication, ordering strategy, late data behavior, exactly-once or at-least-once semantics, and replayability. Separate online feature updates from training-label generation. Keep a path to backfill or replay from durable logs when feature logic changes.
Minimize sensitive fields, tokenize or hash identifiers where appropriate, and preserve joinability only when needed. Apply access controls by data layer. Avoid exporting raw PII into experiment trackers or model artifacts. Document retention and deletion policies.
development
This skill should be used when the user asks to train, debug, scale, or improve ML models. PROACTIVELY activate for: (1) PyTorch, TensorFlow/Keras, JAX, Flax, Hugging Face Trainer/Accelerate training loops, (2) distributed training, DDP/FSDP/DeepSpeed, TPU/GPU setup, (3) mixed precision AMP/bf16, gradient accumulation, checkpointing, seeding, (4) overfitting, imbalance, loss functions, regularization, LR schedules, warmup, (5) memory optimization, gradient checkpointing, offloading, quantization-aware training. Provides: reproducible training best practices across deep learning and classical ML.
development
This skill should be used when the user asks to productionize, track, version, govern, monitor, or automate ML systems. PROACTIVELY activate for: (1) MLflow, Weights & Biases, Neptune, Comet, ClearML experiment tracking, (2) model registry, model versioning, artifact lineage, reproducibility, (3) Kubeflow, SageMaker Pipelines, Vertex AI Pipelines, Azure ML pipelines, Databricks workflows, (4) CI/CD, continuous training/evaluation, A/B tests, canary/shadow deployments, (5) drift detection, model monitoring, data validation, responsible AI governance. Provides: end-to-end MLOps architecture and operational safeguards.
development
This skill should be used when the user asks to optimize, export, serve, compress, or accelerate ML inference. PROACTIVELY activate for: (1) latency, throughput, p95/p99, batching, concurrency, KV cache, memory, or cost issues, (2) quantization INT8/INT4, GPTQ, AWQ, bitsandbytes, pruning, sparsity, distillation, (3) ONNX export, ONNX Runtime, TensorRT, TorchScript, torch.compile, XLA, OpenVINO, Core ML, TFLite, (4) Triton, TorchServe, TF Serving, BentoML, Seldon, KServe configuration, (5) edge deployment, CPU/GPU/TPU/Inferentia serving. Provides: hardware-aware inference optimization and safe benchmarking.
testing
This skill should be used when the user asks to tune hyperparameters, run sweeps, optimize search spaces, or use AutoML. PROACTIVELY activate for: (1) Optuna, Ray Tune, FLAML, AutoGluon, Hyperopt, Nevergrad, KerasTuner, W&B sweeps, (2) grid search, random search, Bayesian optimization, TPE, Gaussian processes, evolutionary search, (3) ASHA, Hyperband, successive halving, multi-fidelity optimization, population-based training, (4) learning-rate finder, batch-size search, early stopping, pruning, (5) reproducible sweep design and experiment analysis. Provides: budget-aware hyperparameter search strategy.