MCP HubMCP Hub
Volver a habilidades

orchestrate-ml-pipeline

pjt222
Actualizado 2 days ago
5 vistas
17
2
17
Ver en GitHub
Otroaiautomationdata

Acerca de

Esta habilidad orquesta pipelines de aprendizaje automático de extremo a extremo utilizando Prefect o Airflow, manejando la construcción de DAG, dependencias de tareas, programación y lógica de reintentos. Se integra con herramientas de ML como MLflow y DVC para flujos de trabajo de producción. Úsela cuando automatice procesos de ML de múltiples etapas, desde la ingesta de datos hasta la implementación, o cuando gestione el reentrenamiento periódico de modelos.

Instalación rápida

Claude Code

Recomendado
Principal
npx skills add pjt222/agent-almanac -a claude-code
Comando PluginAlternativo
/plugin add https://github.com/pjt222/agent-almanac
Git CloneAlternativo
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipeline

Copia y pega este comando en Claude Code para instalar esta habilidad

Documentación

Orchestrate ML Pipeline

See Extended Examples for complete configuration files and templates.

End-to-end ML pipelines: deps, scheduling, monitoring.

Use When

  • Automate multi-step ML (ingest → deploy)
  • Schedule periodic retraining
  • Coord distributed data + training
  • Complex stage deps
  • Retry + failure recovery
  • Monitor execution + alerting
  • Orchestrate feature eng/train/eval/deploy
  • Reproducible across envs

In

  • Required: Pipeline components (ingest, preprocess, train, eval)
  • Required: Framework (Prefect, Airflow, Kubeflow)
  • Required: Python env w/ orchestration lib
  • Optional: K8s cluster (distributed)
  • Optional: MLflow tracking server
  • Optional: DVC for data versioning
  • Optional: Slack/email alerts
  • Optional: Monitoring (Prometheus, Grafana)

Do

Step 1: Pick + install framework

Select + setup infra.

# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker

# Start Prefect server (local development)
prefect server start

# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)

Docker Compose for Airflow:

# docker-compose.airflow.yml
version: '3.8'

x-airflow-common: &airflow-common
  image: apache/airflow:2.8.0
  environment:
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)

→ Framework installed, UI accessible (Prefect http://localhost:4200, Airflow http://localhost:8080), DB initialized, scheduler running.

If err: ports (netstat -tulpn | grep 8080), DB conn, Redis for Celery, Python ≥3.8 for Airflow, Docker daemon for containerized, init logs.

Step 2: ML pipeline w/ Prefect

Flow w/ tasks per stage.

# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)

Deploy + schedule:

# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline

# Create deployment with schedule
deployment = Deployment.build_from_flow(
    flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)

→ Flow runs all tasks in order, retries auto on failure, success = green in UI, MLflow logs, model registered + deployed.

If err: deps defined? MLflow accessible? Data paths correct? Circular deps? Task timeout? Prefect logs. Resources (mem/CPU)?

Step 3: ML pipeline w/ Airflow

DAG for prod ML.

# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)

→ DAG in Airflow UI, scheduled runs on time, retries + alerts on failure, XCom passes between tasks, MLflow logs.

If err: syntax check (python dags/ml_training_dag.py), imports avail in env, XCom size (use file paths for big data), email config, scheduler running, task logs in UI.

Step 4: Advanced features

Dynamic DAGs, branching, parallel.

# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time

@task
def process_shard(shard_id: int, data: list) -> dict:
    """Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)

Airflow branching:

# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator

def check_data_quality(**context):
    """Decide which branch to take."""
    data_path = context['ti'].xcom_pull(key='data_path')
    df = pd.read_csv(data_path)

# ... (see EXAMPLES.md for complete implementation)

→ Parallel tasks concurrent (faster), conditional branches by logic, dynamic gen works, Dask distributes.

If err: Dask cluster configured + accessible? task_runner specified? Branches return valid task IDs? Resource contention w/ parallel? Conditional logic correct?

Step 5: Monitoring + alerting

Failure notifications + comprehensive monitoring.

# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext

@task(on_failure=[notify_failure])
def critical_task():
    """Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)

Airflow monitoring w/ sensors:

# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

default_args = {
    'sla': timedelta(hours=4),  # Alert if task exceeds 4 hours
    'on_failure_callback': slack_alert_failure,
    'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)

→ Slack/email on failures, SLA breach = alert, custom metrics tracked, logs aggregated.

If err: Slack webhook correct? SMTP set? Notification blocks loaded? SLA reasonable? Network blocking?

Step 6: CI/CD for pipelines

Version + automate.

# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)

→ Tests pass before deploy, auto deploy to prod, team notified on success, versioning in Git.

If err: test coverage + failures, Prefect Cloud creds, deploy script error handling, Slack webhook, CI logs.

Check

  • Framework installed + running
  • DAG defined w/ correct deps
  • All tasks in proper order
  • Retry logic works
  • Scheduled runs on time
  • MLflow logs experiments
  • DVC versions data
  • Parallel tasks concurrent
  • Conditional branches work
  • Monitoring + alerting functional
  • CI/CD auto deploys
  • Reproducible across envs

Traps

  • Circular deps: A→B, B→A → design DAG carefully, use validators
  • Memory leaks: long tasks accumulate → timeouts, monitor, restart workers
  • XCom size limits: passing big data → use file paths/S3, not direct serial
  • Timezone confusion: wrong times → always UTC, explicit tz in schedule
  • Missing retries: transient = perm fail → exponential backoff retries
  • Tight coupling: deps on impl details → clear interfaces, explicit params
  • No idempotency: re-run = dupes/errors → design idempotent (safe retry)
  • Poor error handling: no context → detailed logs, capture exceptions
  • Resource contention: parallel overwhelms → limit concurrency, quotas
  • Version conflicts: incompatible deps per task → Docker containers for isolation

  • track-ml-experiments — MLflow tracking in tasks
  • version-ml-data — DVC for data versioning
  • build-feature-store — materialize features as task
  • deploy-ml-model-serving — deploy as final stage
  • deploy-to-kubernetes — run pipelines on K8s

Repositorio GitHub

pjt222/agent-almanac
Ruta: i18n/caveman-ultra/skills/orchestrate-ml-pipeline
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

Habilidades relacionadas

llamaguard

Otro

LlamaGuard es el modelo de Meta de 7-8B parámetros para moderar las entradas y salidas de LLM en seis categorías de seguridad como violencia y discurso de odio. Ofrece una precisión del 94-95% y puede implementarse usando vLLM, Hugging Face o Amazon SageMaker. Utiliza esta skill para integrar fácilmente filtrado de contenido y barreras de seguridad en tus aplicaciones de IA.

Ver habilidad

cost-optimization

Otro

Esta Skill de Claude ayuda a los desarrolladores a optimizar los costes en la nube mediante el ajuste de tamaño de recursos, estrategias de etiquetado y análisis de gastos. Proporciona un marco para reducir los gastos en la nube e implementar una gobernanza de costes en AWS, Azure y GCP. Úsala cuando necesites analizar los costes de infraestructura, ajustar el tamaño de los recursos o cumplir con restricciones presupuestarias.

Ver habilidad

quantizing-models-bitsandbytes

Otro

Esta habilidad cuantiza LLMs a precisión de 8 o 4 bits utilizando bitsandbytes, logrando una reducción de memoria del 50-75% con pérdida mínima de precisión. Es ideal para ejecutar modelos más grandes en memoria GPU limitada o para acelerar la inferencia, admitiendo formatos como INT8, NF4 y FP4. La habilidad se integra con HuggingFace Transformers y permite entrenamiento QLoRA y optimizadores de 8 bits.

Ver habilidad

dispatching-parallel-agents

Otro

Esta Skill de Claude despliega múltiples agentes para investigar y solucionar 3 o más problemas independientes de forma concurrente. Está diseñada para escenarios que involucran fallos no relacionados que pueden resolverse sin estado compartido o dependencias. Su capacidad principal es la resolución paralela de problemas, asignando un agente por cada dominio problemático independiente para maximizar la eficiencia.

Ver habilidad