MCP HubMCP Hub
스킬 목록으로 돌아가기

orchestrate-ml-pipeline

pjt222
업데이트됨 2 days ago
8 조회
17
2
17
GitHub에서 보기
기타aiautomationdata

정보

이 Claude 스킬은 DAG 구성, 태스크 의존성, 재시도 로직을 통해 Prefect나 Airflow를 사용하여 프로덕션 ML 파이프라인을 조정합니다. 스케줄링과 모니터링을 처리하며, MLflow, DVC, 피처 스토어와 통합되어 종단 간 워크플로를 지원합니다. 데이터 수집부터 배포까지, 주기적인 재학습과 장애 복구를 포함한 다단계 ML 프로세스 자동화에 활용하세요.

빠른 설치

Claude Code

추천
기본
npx skills add pjt222/agent-almanac -a claude-code
플러그인 명령대체
/plugin add https://github.com/pjt222/agent-almanac
Git 클론대체
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipeline

Claude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요

문서

Orchestrate ML Pipeline

See Extended Examples for complete configuration files and templates.

Build and orchestrate end-to-end machine learning pipelines with dependency management, scheduling, and monitoring.

适用场景

  • Automating multi-step ML workflows from data ingestion to deployment
  • Scheduling periodic model retraining on fresh data
  • Coordinating distributed data processing and training tasks
  • Implementing complex dependencies between ML pipeline stages
  • Managing retry logic and failure recovery
  • Monitoring pipeline execution and alerting on failures
  • Orchestrating feature engineering, training, evaluation, and deployment
  • Building reproducible ML workflows across environments

输入

  • 必需: ML pipeline components (data ingestion, preprocessing, training, evaluation)
  • 必需: Orchestration framework choice (Prefect, Airflow, Kubeflow)
  • 必需: Python environment with orchestration library installed
  • 可选: Kubernetes cluster for distributed execution
  • 可选: MLflow tracking server for experiment logging
  • 可选: DVC for data versioning
  • 可选: Slack/email for alerting
  • 可选: Monitoring infrastructure (Prometheus, Grafana)

步骤

第 1 步:Choose and Install Orchestration Framework

Select appropriate framework and set up infrastructure.

# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker

# Start Prefect server (local development)
prefect server start

# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)

Docker Compose for Airflow:

# docker-compose.airflow.yml
version: '3.8'

x-airflow-common: &airflow-common
  image: apache/airflow:2.8.0
  environment:
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)

预期结果: Orchestration framework installed, web UI accessible (Prefect at http://localhost:4200, Airflow at http://localhost:8080), database initialized, scheduler running.

失败处理: Check port availability (netstat -tulpn | grep 8080), verify database connection, ensure Redis running for Celery, check Python version compatibility (Airflow requires ≥3.8), verify Docker daemon for containerized setup, inspect logs for initialization errors.

第 2 步:Build ML Pipeline with Prefect

Create Prefect flow with tasks for each pipeline stage.

# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)

Deploy and schedule:

# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline

# Create deployment with schedule
deployment = Deployment.build_from_flow(
    flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)

预期结果: Prefect flow executes all tasks in correct order, task failures trigger retries automatically, successful runs show green in UI, MLflow logs experiments, model registered and deployed.

失败处理: Check task dependencies defined correctly, verify MLflow server accessible, ensure data source paths correct, check for circular dependencies, verify task timeout limits, inspect Prefect logs for detailed errors, check resource availability (memory/CPU).

第 3 步:Build ML Pipeline with Airflow

Create Airflow DAG for production ML workflow.

# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)

预期结果: DAG appears in Airflow UI, scheduled runs execute on time, task failures trigger retries and alerts, XCom passes data between tasks, MLflow integration logs experiments.

失败处理: Check DAG file syntax (python dags/ml_training_dag.py), verify imports available in Airflow environment, ensure XCom not exceeding size limits (use file paths for large data), check email configuration for alerts, verify scheduler running, inspect task logs in Airflow UI.

第 4 步:Implement Advanced Features

Add dynamic DAGs, branching, and parallel execution.

# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time

@task
def process_shard(shard_id: int, data: list) -> dict:
    """Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)

Airflow branching:

# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator

def check_data_quality(**context):
    """Decide which branch to take."""
    data_path = context['ti'].xcom_pull(key='data_path')
    df = pd.read_csv(data_path)

# ... (see EXAMPLES.md for complete implementation)

预期结果: Parallel tasks execute concurrently (faster pipeline), conditional branches execute based on logic, dynamic task generation works, Dask cluster distributes work.

失败处理: Check Dask cluster configured and accessible, verify task_runner specified, ensure branching returns valid task IDs, check for resource contention with parallel tasks, verify conditional logic correctness.

第 5 步:Integrate Monitoring and Alerting

Add comprehensive monitoring and failure notifications.

# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext

@task(on_failure=[notify_failure])
def critical_task():
    """Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)

Airflow monitoring with sensors:

# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

default_args = {
    'sla': timedelta(hours=4),  # Alert if task exceeds 4 hours
    'on_failure_callback': slack_alert_failure,
    'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)

预期结果: Slack/email notifications sent on failures, SLA violations trigger alerts, custom metrics tracked, logs aggregated in monitoring system.

失败处理: Verify Slack webhook configured correctly, check email SMTP settings, ensure notification blocks loaded properly, verify SLA values reasonable, check for network issues blocking notifications.

第 6 步:Implement CI/CD for Pipelines

Version control and automate pipeline deployments.

# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)

预期结果: Pipeline tests pass before deployment, automated deployment to production, team notified on successful deployment, pipeline versioning tracked in Git.

失败处理: Check test coverage and failures, verify Prefect Cloud credentials, ensure deployment script handles errors, check Slack webhook configuration, inspect CI logs for deployment errors.

验证清单

  • Orchestration framework installed and running
  • Pipeline DAG defined with correct dependencies
  • All tasks execute in proper order
  • Retry logic functions correctly on failures
  • Scheduled runs execute on time
  • MLflow integration logs experiments
  • DVC integration versions data
  • Parallel tasks execute concurrently
  • Conditional branches work correctly
  • Monitoring and alerting functional
  • CI/CD pipeline deploys automatically
  • Pipeline reproducible across environments

常见问题

  • Circular dependencies: Task A depends on B, B depends on A - carefully design DAG structure, use Airflow/Prefect validators
  • Memory leaks: Long-running tasks accumulate memory - set task timeouts, monitor resource usage, restart workers periodically
  • XCom size limits: Passing large data via XCom - use file paths or external storage (S3) instead of direct serialization
  • Timezone confusion: Schedule runs at wrong times - always use UTC, explicitly set timezone in schedule
  • Missing retries: Tasks fail permanently on transient errors - configure retries with exponential backoff
  • Tight coupling: Tasks directly depend on implementation details - use clear interfaces, pass parameters explicitly
  • No idempotency: Re-running tasks causes duplicates or errors - design tasks to be idempotent (safe to retry)
  • Poor error handling: Failures don't provide useful context - add detailed logging, capture exceptions properly
  • Resource contention: Parallel tasks overwhelm resources - limit concurrency, set resource quotas
  • Version conflicts: Different tasks need incompatible dependencies - use Docker containers for task isolation

相关技能

  • track-ml-experiments - Integrate MLflow tracking into pipeline tasks
  • version-ml-data - Use DVC for data versioning in pipelines
  • build-feature-store - Materialize features as pipeline task
  • deploy-ml-model-serving - Add deployment as final pipeline stage
  • deploy-to-kubernetes - Run orchestrated pipelines on Kubernetes

GitHub 저장소

pjt222/agent-almanac
경로: i18n/zh-CN/skills/orchestrate-ml-pipeline
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

연관 스킬

llamaguard

기타

LlamaGuard는 폭력 및 혐오 발언 등 6가지 안전 범주에서 LLM 입력과 출력을 조정하기 위한 Meta의 70-80억 파라미터 모델입니다. 94-95% 정확도를 제공하며 vLLM, Hugging Face 또는 Amazon SageMaker를 사용해 배포할 수 있습니다. 이 기술을 사용하여 AI 애플리케이션에 콘텐츠 필터링 및 안전 가드레일을 손쉽게 통합하세요.

스킬 보기

cost-optimization

기타

이 Claude Skill은 리소스 적정화, 태깅 전략, 지출 분석을 통해 개발자들이 클라우드 비용을 최적화할 수 있도록 지원합니다. AWS, Azure, GCP에서 클라우드 비용을 절감하고 비용 거버넌스를 구현하기 위한 프레임워크를 제공합니다. 인프라 비용을 분석하거나, 리소스를 적정화하거나, 예산 제약을 충족해야 할 때 사용하세요.

스킬 보기

quantizing-models-bitsandbytes

기타

이 스킬은 bitsandbytes를 사용하여 LLM을 8비트 또는 4비트 정밀도로 양자화하며, 최소한의 정확도 손실로 50-75%의 메모리 감소를 달성합니다. 제한된 GPU 메모리에서 더 큰 모델을 실행하거나 추론을 가속화하는 데 이상적이며, INT8, NF4, FP4와 같은 형식을 지원합니다. 이 스킬은 HuggingFace Transformers와 통합되어 QLoRA 학습 및 8비트 옵티마이저를 가능하게 합니다.

스킬 보기

dispatching-parallel-agents

기타

이 Claude Skill은 3개 이상의 독립적인 문제를 동시에 조사하고 해결하기 위해 다중 에이전트를 배치합니다. 공유 상태나 의존성 없이 해결 가능한 무관련 장애 시나리오에 맞게 설계되었습니다. 핵심 기능은 병렬 문제 해결로, 각 독립 문제 영역마다 하나의 에이전트를 할당하여 효율성을 극대화합니다.

스킬 보기