Model Monitoring
About
This Claude Skill monitors production ML models to detect performance degradation, data drift, and concept drift. It integrates Prometheus for metrics collection, Grafana for visualization, and MLflow for model tracking. Use this skill to maintain model reliability and get alerts on anomalies in your production environment.
Documentation
Model Monitoring
Monitoring deployed machine learning models ensures they continue to perform well in production, detecting data drift, concept drift, and performance degradation.
Monitoring Components
- Performance Metrics: Accuracy, latency, throughput
- Data Drift: Distribution changes in input features
- Concept Drift: Changes in target variable relationships
- Output Drift: Changes in prediction distribution
- Feature Drift: Individual feature distribution changes
- Anomaly Detection: Unusual samples in production
Monitoring Tools
- Prometheus: Metrics collection and storage
- Grafana: Visualization and dashboarding
- MLflow: Model tracking and registry
- TensorFlow Data Validation: Data statistics
- Evidently: Drift detection and monitoring
- Great Expectations: Data quality assertions
Python Implementation
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from scipy import stats
import json
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Production Monitoring System ===")
class ModelMonitoringSystem:
def __init__(self, model, scaler, baseline_data, baseline_targets):
self.model = model
self.scaler = scaler
self.baseline_data = baseline_data
self.baseline_targets = baseline_targets
self.baseline_mean = baseline_data.mean(axis=0)
self.baseline_std = baseline_data.std(axis=0)
self.baseline_predictions = model.predict(baseline_data)
self.metrics_history = []
self.drift_alerts = []
self.performance_history = []
def log_predictions(self, X, y_true, y_pred):
"""Log predictions and compute metrics"""
timestamp = datetime.now()
# Compute metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted', zero_division=0)
recall = recall_score(y_true, y_pred, average='weighted', zero_division=0)
f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)
metric_record = {
'timestamp': timestamp,
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'n_samples': len(X)
}
self.metrics_history.append(metric_record)
return metric_record
def detect_data_drift(self, X_new):
"""Detect data drift using Kolmogorov-Smirnov test"""
drift_detected = False
drift_features = []
for feature_idx in range(X_new.shape[1]):
baseline_feature = self.baseline_data[:, feature_idx]
new_feature = X_new[:, feature_idx]
# KS Test
ks_statistic, p_value = stats.ks_2samp(baseline_feature, new_feature)
if p_value < 0.05: # Significant drift detected
drift_detected = True
drift_features.append({
'feature_index': feature_idx,
'ks_statistic': float(ks_statistic),
'p_value': float(p_value)
})
if drift_detected:
alert = {
'timestamp': datetime.now(),
'type': 'data_drift',
'severity': 'high',
'drifted_features': drift_features,
'n_drifted': len(drift_features)
}
self.drift_alerts.append(alert)
logger.warning(f"Data drift detected in {len(drift_features)} features")
return drift_detected, drift_features
def detect_output_drift(self, y_pred_new):
"""Detect drift in model predictions"""
baseline_pred_dist = pd.Series(self.baseline_predictions).value_counts(normalize=True)
new_pred_dist = pd.Series(y_pred_new).value_counts(normalize=True)
# Compare distributions
classes = set(baseline_pred_dist.index) | set(new_pred_dist.index)
chi2_stat = 0
for cls in classes:
exp = baseline_pred_dist.get(cls, 0.01)
obs = new_pred_dist.get(cls, 0.01)
chi2_stat += (obs - exp) ** 2 / max(exp, 0.01)
p_value = 1 - stats.chi2.cdf(chi2_stat, len(classes) - 1)
if p_value < 0.05:
alert = {
'timestamp': datetime.now(),
'type': 'output_drift',
'severity': 'medium',
'chi2_statistic': float(chi2_stat),
'p_value': float(p_value)
}
self.drift_alerts.append(alert)
logger.warning("Output drift detected in predictions")
return True
return False
def detect_performance_degradation(self, y_true, y_pred):
"""Detect if model performance has degraded"""
current_accuracy = accuracy_score(y_true, y_pred)
baseline_accuracy = accuracy_score(self.baseline_targets, self.baseline_predictions)
degradation_threshold = 0.05 # 5% drop
degradation = baseline_accuracy - current_accuracy
if degradation > degradation_threshold:
alert = {
'timestamp': datetime.now(),
'type': 'performance_degradation',
'severity': 'critical',
'baseline_accuracy': float(baseline_accuracy),
'current_accuracy': float(current_accuracy),
'degradation': float(degradation)
}
self.drift_alerts.append(alert)
logger.error("Performance degradation detected")
return True
return False
def get_monitoring_report(self):
"""Generate monitoring report"""
if not self.metrics_history:
return {}
metrics_df = pd.DataFrame(self.metrics_history)
return {
'monitoring_period': {
'start': self.metrics_history[0]['timestamp'].isoformat(),
'end': self.metrics_history[-1]['timestamp'].isoformat(),
'n_batches': len(self.metrics_history)
},
'performance_summary': {
'avg_accuracy': float(metrics_df['accuracy'].mean()),
'min_accuracy': float(metrics_df['accuracy'].min()),
'max_accuracy': float(metrics_df['accuracy'].max()),
'std_accuracy': float(metrics_df['accuracy'].std()),
'avg_f1': float(metrics_df['f1'].mean())
},
'drift_summary': {
'total_alerts': len(self.drift_alerts),
'data_drift_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'data_drift'),
'output_drift_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'output_drift'),
'performance_alerts': sum(1 for a in self.drift_alerts if a['type'] == 'performance_degradation')
},
'alerts': self.drift_alerts[-10:] # Last 10 alerts
}
print("Monitoring system initialized")
# 2. Create baseline model
print("\n=== 2. Train Baseline Model ===")
from sklearn.datasets import make_classification
# Create baseline data
X_baseline, y_baseline = make_classification(n_samples=1000, n_features=20,
n_informative=15, random_state=42)
scaler = StandardScaler()
X_baseline_scaled = scaler.fit_transform(X_baseline)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_baseline_scaled, y_baseline)
print(f"Baseline model trained on {len(X_baseline)} samples")
# 3. Initialize monitoring
monitor = ModelMonitoringSystem(model, scaler, X_baseline_scaled, y_baseline)
# 4. Simulate production data and monitoring
print("\n=== 3. Production Monitoring Simulation ===")
# Simulate normal production data
X_prod_normal = np.random.randn(500, 20) * 0.5
X_prod_normal_scaled = scaler.transform(X_prod_normal)
y_pred_normal = model.predict(X_prod_normal_scaled)
y_true_normal = np.random.randint(0, 2, 500)
metrics_normal = monitor.log_predictions(X_prod_normal_scaled, y_true_normal, y_pred_normal)
print(f"Normal production batch - Accuracy: {metrics_normal['accuracy']:.4f}")
# Simulate drifted data
X_prod_drift = np.random.randn(500, 20) * 2.0 # Different distribution
X_prod_drift_scaled = scaler.transform(X_prod_drift)
y_pred_drift = model.predict(X_prod_drift_scaled)
y_true_drift = np.random.randint(0, 2, 500)
metrics_drift = monitor.log_predictions(X_prod_drift_scaled, y_true_drift, y_pred_drift)
drift_detected, drift_features = monitor.detect_data_drift(X_prod_drift_scaled)
print(f"Drifted production batch - Accuracy: {metrics_drift['accuracy']:.4f}")
print(f"Data drift detected: {drift_detected}")
# Check performance degradation
perf_degradation = monitor.detect_performance_degradation(y_true_drift, y_pred_drift)
print(f"Performance degradation: {perf_degradation}")
# 5. Prometheus metrics export
print("\n=== 4. Prometheus Metrics Format ===")
prometheus_metrics = f"""
# HELP model_accuracy Model accuracy score
# TYPE model_accuracy gauge
model_accuracy{"{timestamp='2024-01-01'}"} {metrics_normal['accuracy']:.4f}
# HELP model_f1_score Model F1 score
# TYPE model_f1_score gauge
model_f1_score{"{timestamp='2024-01-01'}"} {metrics_normal['f1']:.4f}
# HELP model_predictions_total Total predictions made
# TYPE model_predictions_total counter
model_predictions_total 5000
# HELP model_drift_detected Data drift detection flag
# TYPE model_drift_detected gauge
model_drift_detected {int(drift_detected)}
# HELP model_latency_seconds Prediction latency in seconds
# TYPE model_latency_seconds histogram
model_latency_seconds_bucket{{le="0.01"}} 100
model_latency_seconds_bucket{{le="0.05"}} 450
model_latency_seconds_bucket{{le="0.1"}} 500
"""
print("Prometheus metrics:")
print(prometheus_metrics)
# 6. Grafana dashboard JSON
print("\n=== 5. Grafana Dashboard Configuration ===")
grafana_dashboard = {
'title': 'ML Model Monitoring Dashboard',
'panels': [
{
'title': 'Model Accuracy',
'targets': [{'metric': 'model_accuracy'}],
'type': 'graph'
},
{
'title': 'Data Drift Alerts',
'targets': [{'metric': 'model_drift_detected'}],
'type': 'stat'
},
{
'title': 'Prediction Latency',
'targets': [{'metric': 'model_latency_seconds'}],
'type': 'graph'
},
{
'title': 'Feature Distributions',
'targets': [{'metric': 'feature_distribution_change'}],
'type': 'heatmap'
}
]
}
print("Grafana dashboard configured with 4 panels")
# 7. Visualization
print("\n=== 6. Monitoring Visualization ===")
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Performance over time
metrics_df = pd.DataFrame(monitor.metrics_history)
axes[0, 0].plot(range(len(metrics_df)), metrics_df['accuracy'], marker='o', label='Accuracy')
axes[0, 0].axhline(y=metrics_df['accuracy'].mean(), color='r', linestyle='--', label='Mean')
axes[0, 0].set_xlabel('Batch')
axes[0, 0].set_ylabel('Accuracy')
axes[0, 0].set_title('Model Accuracy Over Time')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Precision, Recall, F1
axes[0, 1].plot(range(len(metrics_df)), metrics_df['precision'], label='Precision', marker='s')
axes[0, 1].plot(range(len(metrics_df)), metrics_df['recall'], label='Recall', marker='^')
axes[0, 1].plot(range(len(metrics_df)), metrics_df['f1'], label='F1', marker='d')
axes[0, 1].set_xlabel('Batch')
axes[0, 1].set_ylabel('Score')
axes[0, 1].set_title('Performance Metrics Over Time')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Feature distributions (baseline vs current)
feature_stats = pd.DataFrame({
'Baseline Mean': np.mean(X_baseline, axis=0)[:10],
'Current Mean': np.mean(X_prod_drift, axis=0)[:10]
})
axes[1, 0].bar(range(len(feature_stats)), feature_stats['Baseline Mean'], alpha=0.7, label='Baseline')
axes[1, 0].bar(range(len(feature_stats)), feature_stats['Current Mean'], alpha=0.7, label='Current')
axes[1, 0].set_xlabel('Feature')
axes[1, 0].set_ylabel('Mean Value')
axes[1, 0].set_title('Feature Distribution Drift (First 10)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3, axis='y')
# Alerts over time
alert_types = [a['type'] for a in monitor.drift_alerts]
alert_counts = pd.Series(alert_types).value_counts()
axes[1, 1].barh(alert_counts.index, alert_counts.values, color=['red', 'orange', 'yellow'])
axes[1, 1].set_xlabel('Count')
axes[1, 1].set_title('Alert Types Distribution')
axes[1, 1].grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.savefig('model_monitoring_dashboard.png', dpi=100, bbox_inches='tight')
print("\nMonitoring dashboard saved as 'model_monitoring_dashboard.png'")
# 8. Report generation
report = monitor.get_monitoring_report()
print("\n=== 7. Monitoring Report ===")
print(json.dumps(report, indent=2, default=str))
print("\nModel monitoring system setup completed!")
Drift Detection Techniques
- Kolmogorov-Smirnov Test: Statistical test for distribution change
- Population Stability Index: Measures feature distribution shifts
- Chi-square Test: Categorical feature drift
- Wasserstein Distance: Optimal transport-based distance
- Jensen-Shannon Divergence: Information-theoretic metric
Alert Thresholds
- Critical: >5% accuracy drop, immediate action
- High: Data drift in 3+ features, investigation needed
- Medium: Output drift, monitoring increased
- Low: Single feature drift, log and track
Deliverables
- Monitoring dashboards
- Alert configuration
- Drift detection reports
- Performance degradation analysis
- Retraining recommendations
- Action playbook
Quick Install
/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/model-monitoringCopy and paste this command in Claude Code to install this skill
GitHub 仓库
Related Skills
llamaindex
MetaLlamaIndex is a data framework for building RAG-powered LLM applications, specializing in document ingestion, indexing, and querying. It provides key features like vector indices, query engines, and agents, and supports over 300 data connectors. Use it for document Q&A, chatbots, and knowledge retrieval when building data-centric applications.
csv-data-summarizer
MetaThis skill automatically analyzes CSV files to generate comprehensive statistical summaries and visualizations using Python's pandas and matplotlib/seaborn. It should be triggered whenever a user uploads or references CSV data without prompting for analysis preferences. The tool provides immediate insights into data structure, quality, and patterns through automated analysis and visualization.
hybrid-cloud-networking
MetaThis skill configures secure hybrid cloud networking between on-premises infrastructure and cloud platforms like AWS, Azure, and GCP. Use it when connecting data centers to the cloud, building hybrid architectures, or implementing secure cross-premises connectivity. It supports key capabilities such as VPNs and dedicated connections like AWS Direct Connect for high-performance, reliable setups.
Excel Analysis
MetaThis skill enables developers to analyze Excel files and perform data operations using pandas. It can read spreadsheets, create pivot tables, generate charts, and conduct data analysis on .xlsx files and tabular data. Use it when working with Excel files, spreadsheets, or any structured tabular data within Claude Code.
