orchestrate-ml-pipeline
정보
이 스킬은 Prefect나 Airflow를 사용하여 종단간(end-to-end) 머신러닝 파이프라인을 조정하며, DAG 구성, 작업 의존성, 스케줄링, 재시도 로직을 처리합니다. MLflow, DVC, 피처 스토어와 통합하여 프로덕션 워크플로우를 지원합니다. 데이터 수집부터 배포까지 다단계 ML 프로세스를 자동화하거나, 예약된 재학습 및 장애 복구를 관리할 때 사용하세요.
빠른 설치
Claude Code
추천npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineClaude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요
문서
編 ML 管道
詳例見 Extended Examples。
以依管、排程、與察建並編端至端 ML 管道。
用時
- 自動多步 ML 流自據入至部署乃用
- 新據上排定期模型重訓乃用
- 分布據處與訓任務之協調乃用
- 行 ML 管階間複依乃用
- 重試邏輯與失敗復管乃用
- 察管道行與失敗警乃用
- 編特徵工、訓、評、部之諸階乃用
- 跨環境建可復 ML 流乃用
入
- 必要:ML 管件(據入、預處、訓、評)
- 必要:編框選(Prefect、Airflow、Kubeflow)
- 必要:裝編庫之 Python 環
- 可選:Kubernetes 群為分布行
- 可選:MLflow 追蹤服以記實驗
- 可選:DVC 為據版本
- 可選:Slack/郵以警
- 可選:察基(Prometheus、Grafana)
法
第一步:選並裝編框
擇宜框並設基設。
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Airflow 之 Docker Compose:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
得: 編框已裝,網 UI 可達(Prefect 於 http://localhost:4200,Airflow 於 http://localhost:8080),庫已初,排程行。
敗則: 察端可用(netstat -tulpn | grep 8080),驗庫連,確 Celery 之 Redis 行,察 Python 版合(Airflow 須 ≥3.8),驗容 Docker 守護,閱日察初誤。
第二步:以 Prefect 建 ML 管道
建 Prefect flow,各階為 task。
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
部並排:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
得: Prefect flow 諸 task 依序行,task 失敗自重試,成行於 UI 現綠,MLflow 記實驗,模型已註並部。
敗則: 察 task 依正定,驗 MLflow 服可達,確據源徑正,察循依,驗 task 超時,閱 Prefect 日誌詳誤,察資(記/CPU)。
第三步:以 Airflow 建 ML 管道
建 Airflow DAG 為生產 ML 流。
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
得: DAG 現於 Airflow UI,排行按時,task 失敗觸重試與警,XCom 傳據於 task 間,MLflow 整合記實驗。
敗則: 察 DAG 文語法(python dags/ml_training_dag.py),驗入於 Airflow 環可得,確 XCom 不逾大限(大據用文徑),察郵設供警,驗排程行,閱 Airflow UI 中 task 日。
第四步:行進階能
加動 DAG、分支、平行行。
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflow 分支:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
得: 平行 task 並發行(管道更快),條件分支依邏行,動 task 生行,Dask 群分工。
敗則: 察 Dask 群已設可達,驗 task_runner 已指,確分支返有效 task ID,察平行 task 之資爭,驗條件邏正。
第五步:合察與警
加全察與失敗通。
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
Airflow 之 sensor 察:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
得: Slack/郵通於失敗送,SLA 違觸警,自度量追,日誌聚於察系。
敗則: 驗 Slack webhook 正設,察郵 SMTP 設,確通塊已正載,驗 SLA 值合理,察阻通之網問。
第六步:行管道之 CI/CD
版控並自動管道部。
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
得: 部前管試過,自動部生產,成部告隊,管版於 Git 追。
敗則: 察試覆與失敗,驗 Prefect Cloud 憑,確部腳本處誤,察 Slack webhook 設,閱 CI 日察部誤。
驗
- 編框已裝且行
- 管道 DAG 定,依正
- 諸 task 依正序行
- 失敗時重試邏輯正
- 排行按時行
- MLflow 整合記實驗
- DVC 整合記據版本
- 平行 task 並發行
- 條件分支正
- 察與警可行
- CI/CD 自動部
- 管道跨環可復
陷
- 循依:A 依 B,B 依 A——慎設 DAG 構,用 Airflow/Prefect 驗器
- 記漏:長 task 累記——設 task 超時,察資用,定期重啟工
- XCom 大限:以 XCom 傳大據——用文徑或外存(S3)非直序
- 時區混:誤時排——必用 UTC,明設排程時區
- 無重試:暫誤致 task 永敗——設指數退之重試
- 緊耦:task 直依實現——用清接,明傳參
- 無冪等:重行致重複或誤——設冪等之 task(重試安)
- 誤處差:失敗無有用脈——加詳日誌,正捕異
- 資爭:平行 task 壓資——限並發,設資配額
- 版衝:諸 task 須不容依——用 Docker 容隔離
參
track-ml-experiments- 合 MLflow 追於管道 taskversion-ml-data- 用 DVC 為管中據版本build-feature-store- 物化特徵為管 taskdeploy-ml-model-serving- 加部為終管階deploy-to-kubernetes- 於 Kubernetes 行編管道
GitHub 저장소
연관 스킬
llamaguard
기타LlamaGuard는 폭력 및 혐오 발언 등 6가지 안전 범주에서 LLM 입력과 출력을 조정하기 위한 Meta의 70-80억 파라미터 모델입니다. 94-95% 정확도를 제공하며 vLLM, Hugging Face 또는 Amazon SageMaker를 사용해 배포할 수 있습니다. 이 기술을 사용하여 AI 애플리케이션에 콘텐츠 필터링 및 안전 가드레일을 손쉽게 통합하세요.
cost-optimization
기타이 Claude Skill은 리소스 적정화, 태깅 전략, 지출 분석을 통해 개발자들이 클라우드 비용을 최적화할 수 있도록 지원합니다. AWS, Azure, GCP에서 클라우드 비용을 절감하고 비용 거버넌스를 구현하기 위한 프레임워크를 제공합니다. 인프라 비용을 분석하거나, 리소스를 적정화하거나, 예산 제약을 충족해야 할 때 사용하세요.
quantizing-models-bitsandbytes
기타이 스킬은 bitsandbytes를 사용하여 LLM을 8비트 또는 4비트 정밀도로 양자화하며, 최소한의 정확도 손실로 50-75%의 메모리 감소를 달성합니다. 제한된 GPU 메모리에서 더 큰 모델을 실행하거나 추론을 가속화하는 데 이상적이며, INT8, NF4, FP4와 같은 형식을 지원합니다. 이 스킬은 HuggingFace Transformers와 통합되어 QLoRA 학습 및 8비트 옵티마이저를 가능하게 합니다.
dispatching-parallel-agents
기타이 Claude Skill은 3개 이상의 독립적인 문제를 동시에 조사하고 해결하기 위해 다중 에이전트를 배치합니다. 공유 상태나 의존성 없이 해결 가능한 무관련 장애 시나리오에 맞게 설계되었습니다. 핵심 기능은 병렬 문제 해결로, 각 독립 문제 영역마다 하나의 에이전트를 할당하여 효율성을 극대화합니다.
