Model Deployment
About
This skill enables developers to deploy machine learning models to production environments using popular tools like Flask, FastAPI, Docker, and cloud platforms. It covers both synchronous API deployment and batch processing approaches while addressing key considerations like scalability and monitoring. Use it when you need to operationalize trained models for real-world applications.
Documentation
Model Deployment
Model deployment is the process of taking a trained machine learning model and making it available for production use through APIs, web services, or batch processing systems.
Deployment Approaches
- REST APIs: Flask, FastAPI for synchronous inference
- Batch Processing: Scheduled jobs for large-scale predictions
- Real-time Streaming: Kafka, Spark Streaming for continuous data
- Serverless: AWS Lambda, Google Cloud Functions
- Edge Deployment: TensorFlow Lite, ONNX for edge devices
- Model Serving: TensorFlow Serving, Seldon Core, BentoML
Key Considerations
- Model Format: Pickle, SavedModel, ONNX, PMML
- Scalability: Load balancing, auto-scaling
- Latency: Response time requirements
- Monitoring: Model drift, performance metrics
- Versioning: Multiple model versions in production
Python Implementation
import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib
# FastAPI for REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn
# For model serving
import mlflow.pyfunc
import mlflow.sklearn
# Docker and deployment
import logging
import time
from typing import List, Dict
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
print("=== 1. Train and Save Model ===")
# Create dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)
# Save model and preprocessing
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'
joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)
print(f"Model saved to {model_path}")
print(f"Scaler saved to {scaler_path}")
# 2. Model Serving Class
print("\n=== 2. Model Serving Class ===")
class ModelPredictor:
def __init__(self, model_path, scaler_path):
self.model = joblib.load(model_path)
self.scaler = joblib.load(scaler_path)
self.load_time = time.time()
self.predictions_count = 0
logger.info("Model loaded successfully")
def predict(self, features: List[List[float]]) -> Dict:
try:
X = np.array(features)
X_scaled = self.scaler.transform(X)
predictions = self.model.predict(X_scaled)
probabilities = self.model.predict_proba(X_scaled)
self.predictions_count += len(X)
return {
'predictions': predictions.tolist(),
'probabilities': probabilities.tolist(),
'count': len(X),
'timestamp': time.time()
}
except Exception as e:
logger.error(f"Prediction error: {str(e)}")
raise
def health_check(self) -> Dict:
return {
'status': 'healthy',
'uptime': time.time() - self.load_time,
'predictions': self.predictions_count
}
# Initialize predictor
predictor = ModelPredictor(model_path, scaler_path)
# 3. FastAPI Application
print("\n=== 3. FastAPI Application ===")
app = FastAPI(
title="ML Model API",
description="Production ML model serving API",
version="1.0.0"
)
class PredictionRequest(BaseModel):
features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])
class PredictionResponse(BaseModel):
predictions: List[int]
probabilities: List[List[float]]
count: int
timestamp: float
class HealthResponse(BaseModel):
status: str
uptime: float
predictions: int
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check endpoint"""
return predictor.health_check()
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make predictions"""
try:
result = predictor.predict(request.features)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
"""Batch prediction with background processing"""
all_features = []
for req in requests:
all_features.extend(req.features)
result = predictor.predict(all_features)
background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples")
return result
@app.get("/stats")
async def get_stats():
"""Get model statistics"""
return {
'model_type': type(predictor.model).__name__,
'n_estimators': predictor.model.n_estimators,
'max_depth': predictor.model.max_depth,
'feature_importance': predictor.model.feature_importances_.tolist(),
'total_predictions': predictor.predictions_count
}
# 4. Dockerfile template
print("\n=== 4. Dockerfile Template ===")
dockerfile_content = '''FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model.pkl .
COPY scaler.pkl .
COPY app.py .
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''
print("Dockerfile content:")
print(dockerfile_content)
# 5. Requirements file
print("\n=== 5. Requirements.txt ===")
requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""
print("Requirements:")
print(requirements)
# 6. Docker Compose for deployment
print("\n=== 6. Docker Compose Template ===")
docker_compose = '''version: '3.8'
services:
ml-api:
build: .
ports:
- "8000:8000"
environment:
- LOG_LEVEL=info
- WORKERS=4
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
timeout: 5s
retries: 3
ml-monitor:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
command:
- "--config.file=/etc/prometheus/prometheus.yml"
ml-dashboard:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''
print("Docker Compose content:")
print(docker_compose)
# 7. Testing the API
print("\n=== 7. Testing the API ===")
def test_predictor():
# Test single prediction
test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]
result = predictor.predict(test_features)
print(f"Prediction result: {result}")
# Health check
health = predictor.health_check()
print(f"Health status: {health}")
# Batch predictions
batch_features = [
[1.0] * 20,
[2.0] * 20,
[3.0] * 20,
]
batch_result = predictor.predict(batch_features)
print(f"Batch prediction: {batch_result['count']} samples processed")
test_predictor()
# 8. Model versioning and registry
print("\n=== 8. Model Registry with MLflow ===")
# Log model to MLflow
with mlflow.start_run():
mlflow.sklearn.log_model(model, "model")
mlflow.log_param("max_depth", 10)
mlflow.log_param("n_estimators", 100)
mlflow.log_metric("accuracy", 0.95)
model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
print(f"Model logged to MLflow: {model_uri}")
# 9. Deployment monitoring code
print("\n=== 9. Monitoring Setup ===")
class ModelMonitor:
def __init__(self):
self.predictions = []
self.latencies = []
def log_prediction(self, features, prediction, latency):
self.predictions.append({
'timestamp': time.time(),
'features_mean': np.mean(features),
'prediction': prediction,
'latency_ms': latency * 1000
})
def check_model_drift(self):
if len(self.predictions) < 100:
return {'drift_detected': False}
recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
recent_mean = np.mean(recent_predictions)
drift = abs(recent_mean - historical_mean) > 0.1
return {
'drift_detected': drift,
'historical_mean': float(historical_mean),
'recent_mean': float(recent_mean),
'threshold': 0.1
}
def get_stats(self):
if not self.latencies:
return {}
return {
'avg_latency_ms': np.mean(self.latencies) * 1000,
'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
'total_predictions': len(self.predictions)
}
monitor = ModelMonitor()
print("\nDeployment setup completed!")
print("To run FastAPI server: uvicorn app:app --reload")
Deployment Checklist
- Model format and serialization
- Input/output validation
- Error handling and logging
- Authentication and security
- Rate limiting and throttling
- Health check endpoints
- Monitoring and alerting
- Version management
- Rollback procedures
Cloud Deployment Options
- AWS: SageMaker, Lambda, EC2
- GCP: Vertex AI, Cloud Run, App Engine
- Azure: Machine Learning, App Service
- Kubernetes: Self-managed on-premises
Performance Optimization
- Model quantization for smaller size
- Caching predictions
- Batch processing
- GPU acceleration
- Request pooling
Deliverables
- Deployed model endpoint
- API documentation
- Docker configuration
- Monitoring dashboard
- Deployment guide
- Performance benchmarks
- Scaling recommendations
Quick Install
/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/model-deploymentCopy and paste this command in Claude Code to install this skill
GitHub 仓库
Related Skills
evaluating-llms-harness
TestingThis Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.
langchain
MetaLangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.
huggingface-accelerate
DevelopmentHuggingFace Accelerate provides the simplest API for adding distributed training to PyTorch scripts with just 4 lines of code. It offers a unified interface for multiple distributed training frameworks like DeepSpeed, FSDP, and DDP while handling automatic device placement and mixed precision. This makes it ideal for developers who want to quickly scale their PyTorch training across multiple GPUs or nodes without complex configuration.
nestjs
MetaThis skill provides NestJS development standards and architectural patterns for building domain-centric applications. It covers modular design, dependency injection, decorator patterns, and key framework features like controllers, services, middleware, and interceptors. Use it when developing NestJS applications, implementing APIs, configuring microservices, or integrating with databases.
