plugins/mlflow-integration/skills/mlflow-patterns/SKILL.md
# MLflow Patterns Expert patterns for MLflow tracking server, model registry, custom flavors, and deployment. ## Pattern 1: Production MLflow Server Setup PostgreSQL backend + S3 artifact store for team deployment. ```bash # Infrastructure # PostgreSQL for run metadata (reliable, concurrent, queryable) # S3 for artifacts (scalable, durable, cheap) # 1. Create PostgreSQL database psql -c "CREATE DATABASE mlflow; CREATE USER mlflow_user WITH PASSWORD 'secure_password'; GRANT ALL PRIVILEGES ON
npx skillsauth add hermeticormus/libremlops-claude-code plugins/mlflow-integration/skills/mlflow-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Expert patterns for MLflow tracking server, model registry, custom flavors, and deployment.
PostgreSQL backend + S3 artifact store for team deployment.
# Infrastructure
# PostgreSQL for run metadata (reliable, concurrent, queryable)
# S3 for artifacts (scalable, durable, cheap)
# 1. Create PostgreSQL database
psql -c "CREATE DATABASE mlflow; CREATE USER mlflow_user WITH PASSWORD 'secure_password'; GRANT ALL PRIVILEGES ON DATABASE mlflow TO mlflow_user;"
# 2. Start MLflow server
mlflow server \
--backend-store-uri postgresql://mlflow_user:secure_password@localhost:5432/mlflow \
--default-artifact-root s3://my-mlflow-bucket/artifacts \
--host 0.0.0.0 \
--port 5000 \
--serve-artifacts # proxy artifact requests through server (hides S3 creds from clients)
# 3. Configure clients
export MLFLOW_TRACKING_URI=http://mlflow-server:5000
export AWS_DEFAULT_REGION=us-east-1
# OR in Python:
import mlflow
mlflow.set_tracking_uri("http://mlflow-server:5000")
# Docker deployment
# docker run -p 5000:5000 \
# -e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
# -e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
# ghcr.io/mlflow/mlflow:latest \
# mlflow server --backend-store-uri postgresql://... --default-artifact-root s3://...
Register model with schema, input example, and stage transition.
import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature
from mlflow.tracking import MlflowClient
import numpy as np
client = MlflowClient()
model_name = "fraud-detection-lgbm"
with mlflow.start_run(run_name="lgbm-v2-features") as run:
# Train
model.fit(X_train, y_train)
y_pred = model.predict(X_val)
proba = model.predict_proba(X_val)
# Signature: documents expected input/output schema
signature = infer_signature(
model_input=X_train, # DataFrame or numpy array
model_output=proba # Use probabilities, not class labels
)
# Input example: shows serving endpoint what a valid request looks like
input_example = X_train.iloc[:3].to_dict('list')
# Log model with all metadata
mlflow.sklearn.log_model(
sk_model=model,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name=model_name, # auto-registers to registry
extra_pip_requirements=["lightgbm>=4.0", "pandas>=2.0"]
)
# Log evaluation metrics
mlflow.log_metrics({
"val_auc": roc_auc_score(y_val, proba[:, 1]),
"val_f1": f1_score(y_val, y_pred, average="weighted"),
"val_ks": ks_stat, # KS statistic for credit scoring
})
# Transition to Staging
latest = client.get_latest_versions(model_name, stages=["None"])
for version in latest:
client.transition_model_version_stage(
name=model_name,
version=version.version,
stage="Staging",
archive_existing_versions=False
)
client.update_model_version(
name=model_name,
version=version.version,
description=f"LightGBM with v2 feature set. AUC={0.934:.3f}. Trained on data-v3."
)
# Set named alias for flexible routing
client.set_registered_model_alias(model_name, "challenger", version.version)
print(f"Registered {model_name} v{version.version} → Staging")
Define project entry points for reproducible CI/CD execution.
# MLproject
name: fraud_detection
docker_env:
image: my-registry/ml-training:1.2.3
# OR for conda:
# conda_env: conda.yaml
entry_points:
preprocess:
parameters:
data_path: {type: str}
output_path: {type: str, default: "data/processed/"}
test_size: {type: float, default: 0.2}
command: "python src/preprocess.py --data {data_path} --output {output_path} --test-size {test_size}"
train:
parameters:
data_path: {type: str, default: "data/processed/"}
learning_rate: {type: float, default: 0.05}
n_estimators: {type: int, default: 200}
max_depth: {type: int, default: 6}
experiment_name: {type: str, default: "fraud-detection"}
command: "python src/train.py --data {data_path} --lr {learning_rate} --n-estimators {n_estimators} --max-depth {max_depth} --experiment {experiment_name}"
evaluate:
parameters:
model_uri: {type: str}
test_data: {type: str, default: "data/processed/test.parquet"}
command: "python src/evaluate.py --model {model_uri} --test {test_data}"
# Run training pipeline
mlflow run . -e train -P learning_rate=0.03 -P n_estimators=300
# Run from Git
mlflow run https://github.com/org/repo.git -e train -P learning_rate=0.03 \
--version v1.2.3 # specific git tag
# Run on remote cluster
mlflow run . -e train -b databricks \
--backend-config '{"cluster_spec": {"num_workers": 4, "spark_version": "12.2.x-cpu-ml-scala2.12"}}'
Register models that don't fit standard MLflow flavors.
import mlflow.pyfunc
import mlflow
import pandas as pd
import numpy as np
import joblib
import os
class EnsembleModel(mlflow.pyfunc.PythonModel):
"""Ensemble of multiple models with preprocessing pipeline."""
def load_context(self, context):
"""Load artifacts referenced in mlflow.pyfunc.log_model(artifacts=...)."""
self.preprocessor = joblib.load(context.artifacts["preprocessor"])
self.model_1 = joblib.load(context.artifacts["model_1"])
self.model_2 = joblib.load(context.artifacts["model_2"])
self.weights = np.array([0.6, 0.4])
def predict(self, context, model_input: pd.DataFrame) -> np.ndarray:
"""Called by mlflow models serve and pyfunc.load_model().predict()."""
X = self.preprocessor.transform(model_input)
proba_1 = self.model_1.predict_proba(X)[:, 1]
proba_2 = self.model_2.predict_proba(X)[:, 1]
return self.weights[0] * proba_1 + self.weights[1] * proba_2
# Log the ensemble model
with mlflow.start_run(run_name="ensemble-v1"):
# Save artifacts to temp files
joblib.dump(preprocessor, "/tmp/preprocessor.pkl")
joblib.dump(model_1, "/tmp/model_1.pkl")
joblib.dump(model_2, "/tmp/model_2.pkl")
mlflow.pyfunc.log_model(
artifact_path="ensemble_model",
python_model=EnsembleModel(),
artifacts={
"preprocessor": "/tmp/preprocessor.pkl",
"model_1": "/tmp/model_1.pkl",
"model_2": "/tmp/model_2.pkl",
},
signature=infer_signature(X_sample, y_sample),
input_example=X_sample[:3],
registered_model_name="fraud-ensemble",
conda_env={
"channels": ["defaults", "conda-forge"],
"dependencies": ["python=3.10", "scikit-learn=1.3", "lightgbm=4.0", {"pip": ["mlflow>=2.8"]}]
}
)
Trigger deployment pipeline when a model transitions to Production.
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Create webhook: POST to CI/CD endpoint on Production transition
webhook = client.create_registry_webhook(
events=["MODEL_VERSION_TRANSITIONED_TO_PRODUCTION"],
http_url_spec={
"url": "https://jenkins.internal/job/deploy-model/build",
"authorization": f"Bearer {os.environ['JENKINS_TOKEN']}",
"enable_ssl_verification": True,
},
model_name="fraud-detection-lgbm",
description="Trigger deployment pipeline on Production promotion"
)
# The webhook POST body includes:
# {
# "event": "MODEL_VERSION_TRANSITIONED_TO_PRODUCTION",
# "model_name": "fraud-detection-lgbm",
# "version": "7",
# "to_stage": "Production",
# "run_id": "abc123..."
# }
# List existing webhooks
webhooks = client.list_registry_webhooks(model_name="fraud-detection-lgbm")
for wh in webhooks:
print(f"ID: {wh.id} | URL: {wh.http_url_spec.url} | Events: {wh.events}")
SQLite has no concurrent write support. Multiple training jobs writing to the same SQLite file cause corruption or lock errors. Use PostgreSQL for any shared team deployment.
Models without signatures can't be validated at serving time. A schema mismatch between training features and serving features causes silent wrong predictions, not an error. Always log with signature=infer_signature(X_train, predictions).
Transitioning models to Production manually without evaluation criteria leads to model quality regression. Always require: (1) evaluation metrics above threshold, (2) comparison against current Production model, (3) human approval or automated gate.
The backend store (PostgreSQL) is for metadata only — run IDs, params, metrics, tags. Large files (models, datasets) belong in the artifact store (S3). Storing binary blobs in PostgreSQL is slow and expensive.
tools
# VectorDB Patterns Expert patterns for HNSW index tuning, pgvector setup, Pinecone/Qdrant upsert, metadata filtering, multi-tenancy, and embedding drift management. ## Pattern 1: pgvector Setup with HNSW Index PostgreSQL vector search with proper index configuration. ```sql -- Install extension (requires PostgreSQL 15+ with pgvector) CREATE EXTENSION IF NOT EXISTS vector; -- Table with embedding column CREATE TABLE documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tools
# TensorFlow Patterns Expert patterns for Keras functional API, tf.data pipeline ordering, custom layers, SavedModel export, and TFLite quantization. ## Pattern 1: Keras Functional API Model Multi-input model with proper BatchNorm and Dropout usage. ```python import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers def build_classifier( numeric_dim: int, cat_vocab_sizes: dict, # {"country": 50, "device": 10} embedding_dim: int = 16, hidden_u
tools
# RAG Patterns Expert patterns for document chunking, embedding pipelines, hybrid search, cross-encoder re-ranking, and RAGAS evaluation. ## Pattern 1: Document Ingestion with Recursive Chunking Parse and chunk documents with metadata preservation. ```python from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.document_loaders import PyPDFLoader, TextLoader from langchain.schema import Document import hashlib from pathlib import Path def ingest_documents(file_pa
tools
# PyTorch Patterns Expert patterns for custom Dataset/DataLoader, nn.Module design, model surgery, custom autograd, and profiling. ## Pattern 1: Custom Dataset with Transforms Production Dataset with augmentation pipeline and weighted sampling. ```python import torch from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler import pandas as pd import numpy as np from pathlib import Path from PIL import Image import albumentations as A from albumentations.pytorch import ToTensor