orchestrate-ml-pipeline
À propos
Cette compétence de Claude orchestre des pipelines de ML en production en utilisant Prefect ou Airflow, avec la construction de DAG, les dépendances entre tâches et la logique de nouvelle tentative. Elle gère la planification, la surveillance et s'intègre avec MLflow, DVC et les feature stores pour des flux de travail de bout en bout. Utilisez-la pour automatiser les processus de ML en plusieurs étapes, de l'ingestion des données au déploiement, y compris le réentraînement périodique et la reprise après échec.
Installation rapide
Claude Code
Recommandénpx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineCopiez et collez cette commande dans Claude Code pour installer cette compétence
Documentation
Orchestrate ML Pipeline
See Extended Examples for complete configuration files and templates.
Build and orchestrate end-to-end machine learning pipelines with dependency management, scheduling, and monitoring.
适用场景
- Automating multi-step ML workflows from data ingestion to deployment
- Scheduling periodic model retraining on fresh data
- Coordinating distributed data processing and training tasks
- Implementing complex dependencies between ML pipeline stages
- Managing retry logic and failure recovery
- Monitoring pipeline execution and alerting on failures
- Orchestrating feature engineering, training, evaluation, and deployment
- Building reproducible ML workflows across environments
输入
- 必需: ML pipeline components (data ingestion, preprocessing, training, evaluation)
- 必需: Orchestration framework choice (Prefect, Airflow, Kubeflow)
- 必需: Python environment with orchestration library installed
- 可选: Kubernetes cluster for distributed execution
- 可选: MLflow tracking server for experiment logging
- 可选: DVC for data versioning
- 可选: Slack/email for alerting
- 可选: Monitoring infrastructure (Prometheus, Grafana)
步骤
第 1 步:Choose and Install Orchestration Framework
Select appropriate framework and set up infrastructure.
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Docker Compose for Airflow:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
预期结果: Orchestration framework installed, web UI accessible (Prefect at http://localhost:4200, Airflow at http://localhost:8080), database initialized, scheduler running.
失败处理: Check port availability (netstat -tulpn | grep 8080), verify database connection, ensure Redis running for Celery, check Python version compatibility (Airflow requires ≥3.8), verify Docker daemon for containerized setup, inspect logs for initialization errors.
第 2 步:Build ML Pipeline with Prefect
Create Prefect flow with tasks for each pipeline stage.
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
Deploy and schedule:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
预期结果: Prefect flow executes all tasks in correct order, task failures trigger retries automatically, successful runs show green in UI, MLflow logs experiments, model registered and deployed.
失败处理: Check task dependencies defined correctly, verify MLflow server accessible, ensure data source paths correct, check for circular dependencies, verify task timeout limits, inspect Prefect logs for detailed errors, check resource availability (memory/CPU).
第 3 步:Build ML Pipeline with Airflow
Create Airflow DAG for production ML workflow.
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
预期结果: DAG appears in Airflow UI, scheduled runs execute on time, task failures trigger retries and alerts, XCom passes data between tasks, MLflow integration logs experiments.
失败处理: Check DAG file syntax (python dags/ml_training_dag.py), verify imports available in Airflow environment, ensure XCom not exceeding size limits (use file paths for large data), check email configuration for alerts, verify scheduler running, inspect task logs in Airflow UI.
第 4 步:Implement Advanced Features
Add dynamic DAGs, branching, and parallel execution.
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflow branching:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
预期结果: Parallel tasks execute concurrently (faster pipeline), conditional branches execute based on logic, dynamic task generation works, Dask cluster distributes work.
失败处理: Check Dask cluster configured and accessible, verify task_runner specified, ensure branching returns valid task IDs, check for resource contention with parallel tasks, verify conditional logic correctness.
第 5 步:Integrate Monitoring and Alerting
Add comprehensive monitoring and failure notifications.
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
Airflow monitoring with sensors:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
预期结果: Slack/email notifications sent on failures, SLA violations trigger alerts, custom metrics tracked, logs aggregated in monitoring system.
失败处理: Verify Slack webhook configured correctly, check email SMTP settings, ensure notification blocks loaded properly, verify SLA values reasonable, check for network issues blocking notifications.
第 6 步:Implement CI/CD for Pipelines
Version control and automate pipeline deployments.
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
预期结果: Pipeline tests pass before deployment, automated deployment to production, team notified on successful deployment, pipeline versioning tracked in Git.
失败处理: Check test coverage and failures, verify Prefect Cloud credentials, ensure deployment script handles errors, check Slack webhook configuration, inspect CI logs for deployment errors.
验证清单
- Orchestration framework installed and running
- Pipeline DAG defined with correct dependencies
- All tasks execute in proper order
- Retry logic functions correctly on failures
- Scheduled runs execute on time
- MLflow integration logs experiments
- DVC integration versions data
- Parallel tasks execute concurrently
- Conditional branches work correctly
- Monitoring and alerting functional
- CI/CD pipeline deploys automatically
- Pipeline reproducible across environments
常见问题
- Circular dependencies: Task A depends on B, B depends on A - carefully design DAG structure, use Airflow/Prefect validators
- Memory leaks: Long-running tasks accumulate memory - set task timeouts, monitor resource usage, restart workers periodically
- XCom size limits: Passing large data via XCom - use file paths or external storage (S3) instead of direct serialization
- Timezone confusion: Schedule runs at wrong times - always use UTC, explicitly set timezone in schedule
- Missing retries: Tasks fail permanently on transient errors - configure retries with exponential backoff
- Tight coupling: Tasks directly depend on implementation details - use clear interfaces, pass parameters explicitly
- No idempotency: Re-running tasks causes duplicates or errors - design tasks to be idempotent (safe to retry)
- Poor error handling: Failures don't provide useful context - add detailed logging, capture exceptions properly
- Resource contention: Parallel tasks overwhelm resources - limit concurrency, set resource quotas
- Version conflicts: Different tasks need incompatible dependencies - use Docker containers for task isolation
相关技能
track-ml-experiments- Integrate MLflow tracking into pipeline tasksversion-ml-data- Use DVC for data versioning in pipelinesbuild-feature-store- Materialize features as pipeline taskdeploy-ml-model-serving- Add deployment as final pipeline stagedeploy-to-kubernetes- Run orchestrated pipelines on Kubernetes
Dépôt GitHub
Compétences associées
llamaguard
AutreLlamaGuard est le modèle de Meta, doté de 7 à 8 milliards de paramètres, conçu pour modérer les entrées et sorties des LLM selon six catégories de sécurité comme la violence et les discours haineux. Il offre une précision de 94 à 95 % et peut être déployé avec vLLM, Hugging Face ou Amazon SageMaker. Utilisez cette compétence pour intégrer facilement le filtrage de contenu et des garde-fous de sécurité dans vos applications d'IA.
cost-optimization
AutreCette compétence de Claude aide les développeurs à optimiser les coûts du cloud grâce au redimensionnement des ressources, aux stratégies d'étiquetage et à l'analyse des dépenses. Elle fournit un cadre pour réduire les dépenses cloud et mettre en œuvre une gouvernance des coûts sur AWS, Azure et GCP. Utilisez-la lorsque vous devez analyser les coûts d'infrastructure, redimensionner les ressources ou respecter des contraintes budgétaires.
quantizing-models-bitsandbytes
AutreCette compétence quantifie les LLMs en précision 8 bits ou 4 bits à l'aide de bitsandbytes, permettant une réduction de 50 à 75 % de la mémoire utilisée avec une perte de précision minime. Elle est idéale pour exécuter des modèles plus volumineux sur une mémoire GPU limitée ou pour accélérer l'inférence, prenant en charge des formats comme INT8, NF4 et FP4. La compétence s'intègre à HuggingFace Transformers et permet l'entraînement QLoRA ainsi que l'utilisation d'optimiseurs en 8 bits.
dispatching-parallel-agents
AutreCette compétence Claude déploie plusieurs agents pour enquêter et résoudre simultanément 3 problèmes indépendants ou plus. Elle est conçue pour des scénarios impliquant des défaillances non liées qui peuvent être résolues sans état partagé ni dépendances. La capacité fondamentale est la résolution de problèmes en parallèle, en assignant un agent par domaine problématique indépendant afin de maximiser l'efficacité.
