MCP HubMCP Hub
스킬 목록으로 돌아가기

orchestrate-ml-pipeline

pjt222
업데이트됨 Yesterday
8 조회
17
2
17
GitHub에서 보기
기타aiautomationdata

정보

이 스킬은 Prefect나 Airflow를 사용하여 종단간(end-to-end) 머신러닝 파이프라인을 조정하며, DAG 구성, 작업 의존성, 스케줄링, 재시도 로직을 처리합니다. MLflow, DVC, 피처 스토어와 통합하여 프로덕션 워크플로우를 지원합니다. 데이터 수집부터 배포까지 다단계 ML 프로세스를 자동화하거나, 예약된 재학습 및 장애 복구를 관리할 때 사용하세요.

빠른 설치

Claude Code

추천
기본
npx skills add pjt222/agent-almanac -a claude-code
플러그인 명령대체
/plugin add https://github.com/pjt222/agent-almanac
Git 클론대체
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipeline

Claude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요

문서

編 ML 管道

詳例見 Extended Examples

以依管、排程、與察建並編端至端 ML 管道。

用時

  • 自動多步 ML 流自據入至部署乃用
  • 新據上排定期模型重訓乃用
  • 分布據處與訓任務之協調乃用
  • 行 ML 管階間複依乃用
  • 重試邏輯與失敗復管乃用
  • 察管道行與失敗警乃用
  • 編特徵工、訓、評、部之諸階乃用
  • 跨環境建可復 ML 流乃用

  • 必要:ML 管件(據入、預處、訓、評)
  • 必要:編框選(Prefect、Airflow、Kubeflow)
  • 必要:裝編庫之 Python 環
  • 可選:Kubernetes 群為分布行
  • 可選:MLflow 追蹤服以記實驗
  • 可選:DVC 為據版本
  • 可選:Slack/郵以警
  • 可選:察基(Prometheus、Grafana)

第一步:選並裝編框

擇宜框並設基設。

# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker

# Start Prefect server (local development)
prefect server start

# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)

Airflow 之 Docker Compose:

# docker-compose.airflow.yml
version: '3.8'

x-airflow-common: &airflow-common
  image: apache/airflow:2.8.0
  environment:
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)

得: 編框已裝,網 UI 可達(Prefect 於 http://localhost:4200,Airflowhttp://localhost:8080),庫已初,排程行。

敗則: 察端可用(netstat -tulpn | grep 8080),驗庫連,確 Celery 之 Redis 行,察 Python 版合(Airflow 須 ≥3.8),驗容 Docker 守護,閱日察初誤。

第二步:以 Prefect 建 ML 管道

建 Prefect flow,各階為 task。

# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)

部並排:

# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline

# Create deployment with schedule
deployment = Deployment.build_from_flow(
    flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)

得: Prefect flow 諸 task 依序行,task 失敗自重試,成行於 UI 現綠,MLflow 記實驗,模型已註並部。

敗則: 察 task 依正定,驗 MLflow 服可達,確據源徑正,察循依,驗 task 超時,閱 Prefect 日誌詳誤,察資(記/CPU)。

第三步:以 Airflow 建 ML 管道

建 Airflow DAG 為生產 ML 流。

# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)

得: DAG 現於 Airflow UI,排行按時,task 失敗觸重試與警,XCom 傳據於 task 間,MLflow 整合記實驗。

敗則: 察 DAG 文語法(python dags/ml_training_dag.py),驗入於 Airflow 環可得,確 XCom 不逾大限(大據用文徑),察郵設供警,驗排程行,閱 Airflow UI 中 task 日。

第四步:行進階能

加動 DAG、分支、平行行。

# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time

@task
def process_shard(shard_id: int, data: list) -> dict:
    """Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)

Airflow 分支:

# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator

def check_data_quality(**context):
    """Decide which branch to take."""
    data_path = context['ti'].xcom_pull(key='data_path')
    df = pd.read_csv(data_path)

# ... (see EXAMPLES.md for complete implementation)

得: 平行 task 並發行(管道更快),條件分支依邏行,動 task 生行,Dask 群分工。

敗則: 察 Dask 群已設可達,驗 task_runner 已指,確分支返有效 task ID,察平行 task 之資爭,驗條件邏正。

第五步:合察與警

加全察與失敗通。

# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext

@task(on_failure=[notify_failure])
def critical_task():
    """Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)

Airflow 之 sensor 察:

# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

default_args = {
    'sla': timedelta(hours=4),  # Alert if task exceeds 4 hours
    'on_failure_callback': slack_alert_failure,
    'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)

得: Slack/郵通於失敗送,SLA 違觸警,自度量追,日誌聚於察系。

敗則: 驗 Slack webhook 正設,察郵 SMTP 設,確通塊已正載,驗 SLA 值合理,察阻通之網問。

第六步:行管道之 CI/CD

版控並自動管道部。

# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)

得: 部前管試過,自動部生產,成部告隊,管版於 Git 追。

敗則: 察試覆與失敗,驗 Prefect Cloud 憑,確部腳本處誤,察 Slack webhook 設,閱 CI 日察部誤。

  • 編框已裝且行
  • 管道 DAG 定,依正
  • 諸 task 依正序行
  • 失敗時重試邏輯正
  • 排行按時行
  • MLflow 整合記實驗
  • DVC 整合記據版本
  • 平行 task 並發行
  • 條件分支正
  • 察與警可行
  • CI/CD 自動部
  • 管道跨環可復

  • 循依:A 依 B,B 依 A——慎設 DAG 構,用 Airflow/Prefect 驗器
  • 記漏:長 task 累記——設 task 超時,察資用,定期重啟工
  • XCom 大限:以 XCom 傳大據——用文徑或外存(S3)非直序
  • 時區混:誤時排——必用 UTC,明設排程時區
  • 無重試:暫誤致 task 永敗——設指數退之重試
  • 緊耦:task 直依實現——用清接,明傳參
  • 無冪等:重行致重複或誤——設冪等之 task(重試安)
  • 誤處差:失敗無有用脈——加詳日誌,正捕異
  • 資爭:平行 task 壓資——限並發,設資配額
  • 版衝:諸 task 須不容依——用 Docker 容隔離

  • track-ml-experiments - 合 MLflow 追於管道 task
  • version-ml-data - 用 DVC 為管中據版本
  • build-feature-store - 物化特徵為管 task
  • deploy-ml-model-serving - 加部為終管階
  • deploy-to-kubernetes - 於 Kubernetes 行編管道

GitHub 저장소

pjt222/agent-almanac
경로: i18n/wenyan/skills/orchestrate-ml-pipeline
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

연관 스킬

llamaguard

기타

LlamaGuard는 폭력 및 혐오 발언 등 6가지 안전 범주에서 LLM 입력과 출력을 조정하기 위한 Meta의 70-80억 파라미터 모델입니다. 94-95% 정확도를 제공하며 vLLM, Hugging Face 또는 Amazon SageMaker를 사용해 배포할 수 있습니다. 이 기술을 사용하여 AI 애플리케이션에 콘텐츠 필터링 및 안전 가드레일을 손쉽게 통합하세요.

스킬 보기

cost-optimization

기타

이 Claude Skill은 리소스 적정화, 태깅 전략, 지출 분석을 통해 개발자들이 클라우드 비용을 최적화할 수 있도록 지원합니다. AWS, Azure, GCP에서 클라우드 비용을 절감하고 비용 거버넌스를 구현하기 위한 프레임워크를 제공합니다. 인프라 비용을 분석하거나, 리소스를 적정화하거나, 예산 제약을 충족해야 할 때 사용하세요.

스킬 보기

quantizing-models-bitsandbytes

기타

이 스킬은 bitsandbytes를 사용하여 LLM을 8비트 또는 4비트 정밀도로 양자화하며, 최소한의 정확도 손실로 50-75%의 메모리 감소를 달성합니다. 제한된 GPU 메모리에서 더 큰 모델을 실행하거나 추론을 가속화하는 데 이상적이며, INT8, NF4, FP4와 같은 형식을 지원합니다. 이 스킬은 HuggingFace Transformers와 통합되어 QLoRA 학습 및 8비트 옵티마이저를 가능하게 합니다.

스킬 보기

dispatching-parallel-agents

기타

이 Claude Skill은 3개 이상의 독립적인 문제를 동시에 조사하고 해결하기 위해 다중 에이전트를 배치합니다. 공유 상태나 의존성 없이 해결 가능한 무관련 장애 시나리오에 맞게 설계되었습니다. 핵심 기능은 병렬 문제 해결로, 각 독립 문제 영역마다 하나의 에이전트를 할당하여 효율성을 극대화합니다.

스킬 보기