MCP HubMCP Hub
스킬 목록으로 돌아가기

orchestrate-ml-pipeline

pjt222
업데이트됨 Yesterday
1 조회
17
2
17
GitHub에서 보기
기타aiautomationdata

정보

이 스킬은 Prefect나 Airflow를 사용하여 종단간 머신러닝 파이프라인을 조율하며, DAG 구성, 작업 의존성, 스케줄링 및 재시도 로직을 처리합니다. MLflow와 DVC 같은 ML 도구와 통합하여 프로덕션 워크플로를 지원합니다. 데이터 수집부터 배포까지의 다단계 ML 프로세스를 자동화하거나 주기적인 모델 재학습을 관리할 때 사용하세요.

빠른 설치

Claude Code

추천
기본
npx skills add pjt222/agent-almanac -a claude-code
플러그인 명령대체
/plugin add https://github.com/pjt222/agent-almanac
Git 클론대체
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipeline

Claude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요

문서

Orchestrate ML Pipeline

See Extended Examples for complete configuration files and templates.

End-to-end ML pipelines: deps, scheduling, monitoring.

Use When

  • Automate multi-step ML (ingest → deploy)
  • Schedule periodic retraining
  • Coord distributed data + training
  • Complex stage deps
  • Retry + failure recovery
  • Monitor execution + alerting
  • Orchestrate feature eng/train/eval/deploy
  • Reproducible across envs

In

  • Required: Pipeline components (ingest, preprocess, train, eval)
  • Required: Framework (Prefect, Airflow, Kubeflow)
  • Required: Python env w/ orchestration lib
  • Optional: K8s cluster (distributed)
  • Optional: MLflow tracking server
  • Optional: DVC for data versioning
  • Optional: Slack/email alerts
  • Optional: Monitoring (Prometheus, Grafana)

Do

Step 1: Pick + install framework

Select + setup infra.

# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker

# Start Prefect server (local development)
prefect server start

# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)

Docker Compose for Airflow:

# docker-compose.airflow.yml
version: '3.8'

x-airflow-common: &airflow-common
  image: apache/airflow:2.8.0
  environment:
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)

→ Framework installed, UI accessible (Prefect http://localhost:4200, Airflow http://localhost:8080), DB initialized, scheduler running.

If err: ports (netstat -tulpn | grep 8080), DB conn, Redis for Celery, Python ≥3.8 for Airflow, Docker daemon for containerized, init logs.

Step 2: ML pipeline w/ Prefect

Flow w/ tasks per stage.

# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)

Deploy + schedule:

# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline

# Create deployment with schedule
deployment = Deployment.build_from_flow(
    flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)

→ Flow runs all tasks in order, retries auto on failure, success = green in UI, MLflow logs, model registered + deployed.

If err: deps defined? MLflow accessible? Data paths correct? Circular deps? Task timeout? Prefect logs. Resources (mem/CPU)?

Step 3: ML pipeline w/ Airflow

DAG for prod ML.

# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)

→ DAG in Airflow UI, scheduled runs on time, retries + alerts on failure, XCom passes between tasks, MLflow logs.

If err: syntax check (python dags/ml_training_dag.py), imports avail in env, XCom size (use file paths for big data), email config, scheduler running, task logs in UI.

Step 4: Advanced features

Dynamic DAGs, branching, parallel.

# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time

@task
def process_shard(shard_id: int, data: list) -> dict:
    """Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)

Airflow branching:

# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator

def check_data_quality(**context):
    """Decide which branch to take."""
    data_path = context['ti'].xcom_pull(key='data_path')
    df = pd.read_csv(data_path)

# ... (see EXAMPLES.md for complete implementation)

→ Parallel tasks concurrent (faster), conditional branches by logic, dynamic gen works, Dask distributes.

If err: Dask cluster configured + accessible? task_runner specified? Branches return valid task IDs? Resource contention w/ parallel? Conditional logic correct?

Step 5: Monitoring + alerting

Failure notifications + comprehensive monitoring.

# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext

@task(on_failure=[notify_failure])
def critical_task():
    """Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)

Airflow monitoring w/ sensors:

# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

default_args = {
    'sla': timedelta(hours=4),  # Alert if task exceeds 4 hours
    'on_failure_callback': slack_alert_failure,
    'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)

→ Slack/email on failures, SLA breach = alert, custom metrics tracked, logs aggregated.

If err: Slack webhook correct? SMTP set? Notification blocks loaded? SLA reasonable? Network blocking?

Step 6: CI/CD for pipelines

Version + automate.

# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)

→ Tests pass before deploy, auto deploy to prod, team notified on success, versioning in Git.

If err: test coverage + failures, Prefect Cloud creds, deploy script error handling, Slack webhook, CI logs.

Check

  • Framework installed + running
  • DAG defined w/ correct deps
  • All tasks in proper order
  • Retry logic works
  • Scheduled runs on time
  • MLflow logs experiments
  • DVC versions data
  • Parallel tasks concurrent
  • Conditional branches work
  • Monitoring + alerting functional
  • CI/CD auto deploys
  • Reproducible across envs

Traps

  • Circular deps: A→B, B→A → design DAG carefully, use validators
  • Memory leaks: long tasks accumulate → timeouts, monitor, restart workers
  • XCom size limits: passing big data → use file paths/S3, not direct serial
  • Timezone confusion: wrong times → always UTC, explicit tz in schedule
  • Missing retries: transient = perm fail → exponential backoff retries
  • Tight coupling: deps on impl details → clear interfaces, explicit params
  • No idempotency: re-run = dupes/errors → design idempotent (safe retry)
  • Poor error handling: no context → detailed logs, capture exceptions
  • Resource contention: parallel overwhelms → limit concurrency, quotas
  • Version conflicts: incompatible deps per task → Docker containers for isolation

  • track-ml-experiments — MLflow tracking in tasks
  • version-ml-data — DVC for data versioning
  • build-feature-store — materialize features as task
  • deploy-ml-model-serving — deploy as final stage
  • deploy-to-kubernetes — run pipelines on K8s

GitHub 저장소

pjt222/agent-almanac
경로: i18n/caveman-ultra/skills/orchestrate-ml-pipeline
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

연관 스킬

llamaguard

기타

LlamaGuard는 폭력 및 혐오 발언 등 6가지 안전 범주에서 LLM 입력과 출력을 조정하기 위한 Meta의 70-80억 파라미터 모델입니다. 94-95% 정확도를 제공하며 vLLM, Hugging Face 또는 Amazon SageMaker를 사용해 배포할 수 있습니다. 이 기술을 사용하여 AI 애플리케이션에 콘텐츠 필터링 및 안전 가드레일을 손쉽게 통합하세요.

스킬 보기

cost-optimization

기타

이 Claude Skill은 리소스 적정화, 태깅 전략, 지출 분석을 통해 개발자들이 클라우드 비용을 최적화할 수 있도록 지원합니다. AWS, Azure, GCP에서 클라우드 비용을 절감하고 비용 거버넌스를 구현하기 위한 프레임워크를 제공합니다. 인프라 비용을 분석하거나, 리소스를 적정화하거나, 예산 제약을 충족해야 할 때 사용하세요.

스킬 보기

quantizing-models-bitsandbytes

기타

이 스킬은 bitsandbytes를 사용하여 LLM을 8비트 또는 4비트 정밀도로 양자화하며, 최소한의 정확도 손실로 50-75%의 메모리 감소를 달성합니다. 제한된 GPU 메모리에서 더 큰 모델을 실행하거나 추론을 가속화하는 데 이상적이며, INT8, NF4, FP4와 같은 형식을 지원합니다. 이 스킬은 HuggingFace Transformers와 통합되어 QLoRA 학습 및 8비트 옵티마이저를 가능하게 합니다.

스킬 보기

dispatching-parallel-agents

기타

이 Claude Skill은 3개 이상의 독립적인 문제를 동시에 조사하고 해결하기 위해 다중 에이전트를 배치합니다. 공유 상태나 의존성 없이 해결 가능한 무관련 장애 시나리오에 맞게 설계되었습니다. 핵심 기능은 병렬 문제 해결로, 각 독립 문제 영역마다 하나의 에이전트를 할당하여 효율성을 극대화합니다.

스킬 보기