skills/model-monitoring/SKILL.md
Monitor model performance, detect data drift, concept drift, and anomalies in production using Prometheus, Grafana, and MLflow
npx skillsauth add aj-geddes/useful-ai-prompts Model MonitoringInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Monitoring deployed machine learning models ensures they continue to perform well in production, detecting data drift, concept drift, and performance degradation.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from scipy import stats
import json
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Production Monitoring System ===")
class ModelMonitoringSystem:
def __init__(self, model, scaler, baseline_data, baseline_targets):
self.model = model
self.scaler = scaler
self.baseline_data = baseline_data
self.baseline_targets = baseline_targets
self.baseline_mean = baseline_data.mean(axis=0)
self.baseline_std = baseline_data.std(axis=0)
self.baseline_predictions = model.predict(baseline_data)
self.metrics_history = []
self.drift_alerts = []
self.performance_history = []
def log_predictions(self, X, y_true, y_pred):
"""Log predictions and compute metrics"""
timestamp = datetime.now()
# Compute metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)
metric_record = {
'timestamp': timestamp,
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'n_samples': len(X)
}
self.metrics_history.append(metric_record)
return metric_record
def detect_data_drift(self, X_new):
"""Detect data drift using Kolmogorov-Smirnov test"""
drift_detected = False
drift_features = []
for feature_idx in range(X_new.shape[1]):
baseline_feature = self.baseline_data[:, feature_idx]
new_feature = X_new[:, feature_idx]
# KS Test
ks_statistic, p_value = stats.ks_2samp(baseline_feature, new_feature)
if p_value < 0.05: # Significant drift detected
drift_detected = True
drift_features.append({
'feature_index': feature_idx,
'ks_statistic': float(ks_statistic),
'p_value': float(p_value)
})
if drift_detected:
alert = {
'timestamp': datetime.now(),
'type': 'data_drift',
'severity': 'high',
'drifted_features': drift_features,
'n_drifted': len(drift_features)
}
self.drift_alerts.append(alert)
logger.warning(f"Data drift detected in {len(drift_features)} features")
return drift_detected, drift_features
def detect_output_drift(self, y_pred_new):
"""Detect drift in model predictions"""
baseline_pred_dist = pd.Series(self.baseline_predictions).value_counts(normalize=True)
new_pred_dist = pd.Series(y_pred_new).value_counts(normalize=True)
# Compare distributions
classes = set(baseline_pred_dist.index) | set(new_pred_dist.index)
chi2_stat = 0
for cls in classes:
exp = baseline_pred_dist.get(cls, 0.01)
obs = new_pred_dist.get(cls, 0.01)
chi2_stat += (obs - exp) ** 2 / max(exp, 0.01)
p_value = 1 - stats.chi2.cdf(chi2_stat, len(classes) - 1)
if p_value < 0.05:
alert = {
'timestamp': datetime.now(),
'type': 'output_drift',
'severity': 'medium',
'chi2_statistic': float(chi2_stat),
'p_value': float(p_value)
}
self.drift_alerts.append(alert)
logger.warning("Output drift detected in predictions")
return True
return False
def detect_performance_degradation(self, y_true, y_pred):
"""Detect if model performance has degraded"""
current_accuracy = accuracy_score(y_true, y_pred)
baseline_accuracy = accuracy_score(self.baseline_targets, self.baseline_predictions)
degradation_threshold = 0.05 # 5% drop
degradation = baseline_accuracy - current_accuracy
if degradation > degradation_threshold:
alert = {
'timestamp': datetime.now(),
'type': 'performance_degradation',
'severity': 'critical',
'baseline_accuracy': float(baseline_accuracy),
'current_accuracy': float(current_accuracy),
'degradation': float(degradation)
}
self.drift_alerts.append(alert)
logger.error("Performance degradation detected")
return True
return False
def get_monitoring_report(self):
"""Generate monitoring report"""
if not self.metrics_history:
return {}
metrics_df = pd.DataFrame(self.metrics_history)
return {
'monitoring_period': {
'start': self.metrics_history[0]['timestamp'].isoformat(),
'end': self.metrics_history[-1]['timestamp'].isoformat(),
'n_batches': len(self.metrics_history)
},
'performance_summary': {
'avg_accuracy': float(metrics_df['accuracy'].mean()),
'min_accuracy': float(metrics_df['accuracy'].min()),
'max_accuracy': float(metrics_df['accuracy'].max()),
'std_accuracy': float(metrics_df['accuracy'].std()),
'avg_f1': float(metrics_df['f1'].mean())
},
'drift_summary': {
'total_alerts': len(self.drift_alerts),
'data_drift_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'data_drift'),
'output_drift_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'output_drift'),
'performance_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'performance_degradation')
},
'alerts': self.drift_alerts[-10:] # Last 10 alerts
}
print("Monitoring system initialized")
# 2. Create baseline model
print("\n=== 2. Train Baseline Model ===")
from sklearn.datasets import make_classification
# Create baseline data
X_baseline, y_baseline = make_classification(n_samples=1000, n_features=20,
n_informative=15, random_state=42)
scaler = StandardScaler()
X_baseline_scaled = scaler.fit_transform(X_baseline)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_baseline_scaled, y_baseline)
print(f"Baseline model trained on {len(X_baseline)} samples")
# 3. Initialize monitoring
monitor = ModelMonitoringSystem(model, scaler, X_baseline_scaled, y_baseline)
# 4. Simulate production data and monitoring
print("\n=== 3. Production Monitoring Simulation ===")
# Simulate normal production data
X_prod_normal = np.random.randn(500, 20) * 0.5
X_prod_normal_scaled = scaler.transform(X_prod_normal)
y_pred_normal = model.predict(X_prod_normal_scaled)
y_true_normal = np.random.randint(0, 2, 500)
metrics_normal = monitor.log_predictions(X_prod_normal_scaled, y_true_normal, y_pred_normal)
print(f"Normal production batch - Accuracy: {metrics_normal['accuracy']:.4f}")
# Simulate drifted data
X_prod_drift = np.random.randn(500, 20) * 2.0 # Different distribution
X_prod_drift_scaled = scaler.transform(X_prod_drift)
y_pred_drift = model.predict(X_prod_drift_scaled)
y_true_drift = np.random.randint(0, 2, 500)
metrics_drift = monitor.log_predictions(X_prod_drift_scaled, y_true_drift, y_pred_drift)
drift_detected, drift_features = monitor.detect_data_drift(X_prod_drift_scaled)
print(f"Drifted production batch - Accuracy: {metrics_drift['accuracy']:.4f}")
print(f"Data drift detected: {drift_detected}")
# Check performance degradation
perf_degradation = monitor.detect_performance_degradation(y_true_drift, y_pred_drift)
print(f"Performance degradation: {perf_degradation}")
# 5. Prometheus metrics export
print("\n=== 4. Prometheus Metrics Format ===")
prometheus_metrics = f"""
# HELP model_accuracy Model accuracy score
# TYPE model_accuracy gauge
model_accuracy{"{timestamp='2024-01-01'}"} {metrics_normal['accuracy']:.4f}
# HELP model_f1_score Model F1 score
# TYPE model_f1_score gauge
model_f1_score{"{timestamp='2024-01-01'}"} {metrics_normal['f1']:.4f}
# HELP model_predictions_total Total predictions made
# TYPE model_predictions_total counter
model_predictions_total 5000
# HELP model_drift_detected Data drift detection flag
# TYPE model_drift_detected gauge
model_drift_detected {int(drift_detected)}
# HELP model_latency_seconds Prediction latency in seconds
# TYPE model_latency_seconds histogram
model_latency_seconds_bucket{{le="0.01"}} 100
model_latency_seconds_bucket{{le="0.05"}} 450
model_latency_seconds_bucket{{le="0.1"}} 500
"""
print("Prometheus metrics:")
print(prometheus_metrics)
# 6. Grafana dashboard JSON
print("\n=== 5. Grafana Dashboard Configuration ===")
grafana_dashboard = {
'title': 'ML Model Monitoring Dashboard',
'panels': [
{
'title': 'Model Accuracy',
'targets': [{'metric': 'model_accuracy'}],
'type': 'graph'
},
{
'title': 'Data Drift Alerts',
'targets': [{'metric': 'model_drift_detected'}],
'type': 'stat'
},
{
'title': 'Prediction Latency',
'targets': [{'metric': 'model_latency_seconds'}],
'type': 'graph'
},
{
'title': 'Feature Distributions',
'targets': [{'metric': 'feature_distribution_change'}],
'type': 'heatmap'
}
]
}
print("Grafana dashboard configured with 4 panels")
# 7. Visualization
print("\n=== 6. Monitoring Visualization ===")
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Performance over time
metrics_df = pd.DataFrame(monitor.metrics_history)
axes[0, 0].plot(range(len(metrics_df)), metrics_df['accuracy'], marker='o', label='Accuracy')
axes[0, 0].axhline(y=metrics_df['accuracy'].mean(), color='r', linestyle='--', label='Mean')
axes[0, 0].set_xlabel('Batch')
axes[0, 0].set_ylabel('Accuracy')
axes[0, 0].set_title('Model Accuracy Over Time')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Precision, Recall, F1
axes[0, 1].plot(range(len(metrics_df)), metrics_df['precision'], label='Precision', marker='s')
axes[0, 1].plot(range(len(metrics_df)), metrics_df['recall'], label='Recall', marker='^')
axes[0, 1].plot(range(len(metrics_df)), metrics_df['f1'], label='F1', marker='d')
axes[0, 1].set_xlabel('Batch')
axes[0, 1].set_ylabel('Score')
axes[0, 1].set_title('Performance Metrics Over Time')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Feature distributions (baseline vs current)
feature_stats = pd.DataFrame({
'Baseline Mean': np.mean(X_baseline, axis=0)[:10],
'Current Mean': np.mean(X_prod_drift, axis=0)[:10]
})
axes[1, 0].bar(range(len(feature_stats)), feature_stats['Baseline Mean'], alpha=0.7, label='Baseline')
axes[1, 0].bar(range(len(feature_stats)), feature_stats['Current Mean'], alpha=0.7, label='Current')
axes[1, 0].set_xlabel('Feature')
axes[1, 0].set_ylabel('Mean Value')
axes[1, 0].set_title('Feature Distribution Drift (First 10)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3, axis='y')
# Alerts over time
alert_types = [a['type'] for a in monitor.drift_alerts]
alert_counts = pd.Series(alert_types).value_counts()
axes[1, 1].barh(alert_counts.index, alert_counts.values, color=['red', 'orange', 'yellow'])
axes[1, 1].set_xlabel('Count')
axes[1, 1].set_title('Alert Types Distribution')
axes[1, 1].grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.savefig('model_monitoring_dashboard.png', dpi=100, bbox_inches='tight')
print("\nMonitoring dashboard saved as 'model_monitoring_dashboard.png'")
# 8. Report generation
report = monitor.get_monitoring_report()
print("\n=== 7. Monitoring Report ===")
print(json.dumps(report, indent=2, default=str))
print("\nModel monitoring system setup completed!")
development
Implement Zero Trust security model with identity verification, microsegmentation, least privilege access, and continuous monitoring. Use when building secure cloud-native applications.
development
Prevent Cross-Site Scripting (XSS) attacks through input sanitization, output encoding, and Content Security Policy. Use when handling user-generated content in web applications.
tools
Create wireframes and interactive prototypes to visualize user interfaces and gather feedback early. Use tools and techniques to communicate design ideas before development.
development
Implement real-time bidirectional communication with WebSockets including connection management, message routing, and scaling. Use when building real-time features, chat systems, live notifications, or collaborative applications.