orchestrate-ml-pipeline
정보
이 스킬은 Prefect나 Airflow를 사용하여 엔드투엔드 머신러닝 파이프라인을 조율하며, DAG 구성, 태스크 의존성, 스케줄링 및 재시도 로직을 처리합니다. MLflow와 DVC 같은 ML 도구와 통합되어 프로덕션 워크플로를 지원합니다. 데이터 수집부터 배포까지의 다단계 ML 프로세스를 자동화하거나 예약된 재학습 파이프라인을 관리할 때 사용하세요.
빠른 설치
Claude Code
추천npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineClaude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요
문서
編排 ML 管道
完整配置檔與範本詳見 Extended Examples。
以依賴管理、排程與監測,建立並編排端到端之機器學習管道。
適用時機
- 自動化由資料攝取至部署之多步 ML 工作流
- 對新資料排程定期模型重訓
- 協調分散式資料處理與訓練任務
- 實施 ML 管道階段間之複雜依賴
- 管理重試邏輯與失敗恢復
- 監測管道執行並對失敗告警
- 編排特徵工程、訓練、評估與部署
- 建立跨環境之可重現 ML 工作流
輸入
- 必要:ML 管道組件(資料攝取、預處理、訓練、評估)
- 必要:編排框架之擇(Prefect、Airflow、Kubeflow)
- 必要:已裝編排函式庫之 Python 環境
- 選擇性:分散式執行之 Kubernetes 集群
- 選擇性:實驗記錄之 MLflow 追蹤伺服器
- 選擇性:資料版本控制之 DVC
- 選擇性:告警之 Slack/email
- 選擇性:監測基礎設施(Prometheus、Grafana)
步驟
步驟一:擇並裝編排框架
擇適框架並建基礎設施。
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Airflow 之 Docker Compose:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
預期: 編排框架已裝,網頁 UI 可達(Prefect 於 http://localhost:4200,Airflow 於 http://localhost:8080),資料庫已初始化,排程器運行中。
失敗時: 查埠可用性(netstat -tulpn | grep 8080),驗資料庫連接,確 Redis 為 Celery 運行,查 Python 版本相容性(Airflow 需 ≥3.8),驗 Docker 守護程序為容器化設置,覽日誌中之初始化錯誤。
步驟二:以 Prefect 建 ML 管道
為每管道階段建附任務之 Prefect 流。
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
部署並排程:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
預期: Prefect 流以正確順序執行所有任務,任務失敗自動觸發重試,成功運行於 UI 中顯綠,MLflow 記錄實驗,模型註冊並部署。
失敗時: 查任務依賴定義正確,驗 MLflow 伺服器可達,確資料來源路徑正確,查循環依賴,驗任務逾時上限,覽 Prefect 日誌中之詳錯,查資源可用性(記憶體/CPU)。
步驟三:以 Airflow 建 ML 管道
為生產 ML 工作流建 Airflow DAG。
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
預期: DAG 現於 Airflow UI,排程運行按時執行,任務失敗觸發重試與告警,XCom 於任務間傳資料,MLflow 整合記錄實驗。
失敗時: 查 DAG 檔語法(python dags/ml_training_dag.py),驗匯入於 Airflow 環境中可得,確 XCom 不逾大小上限(大資料用檔路徑),查告警之電郵配置,驗排程器運行,覽 Airflow UI 中之任務日誌。
步驟四:實施進階特性
加動態 DAG、分支與並行執行。
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflow 分支:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
預期: 並行任務同步執行(管道更快),條件分支依邏輯執行,動態任務生成可運作,Dask 集群分配工作。
失敗時: 查 Dask 集群已配且可達,驗 task_runner 已指定,確分支返有效任務 ID,查並行任務之資源爭用,驗條件邏輯正確。
步驟五:整合監測與告警
加全面監測與失敗通知。
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
Airflow 之感應器監測:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
預期: 失敗時送 Slack/email 通知,SLA 違反觸發告警,自定指標已追蹤,日誌於監測系統中聚合。
失敗時: 驗 Slack webhook 已正確配,查電郵 SMTP 設定,確通知區塊正確載入,驗 SLA 值合理,查阻通知之網路問題。
步驟六:為管道實施 CI/CD
版本控制並自動化管道部署。
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
預期: 部署前管道測試通過,自動部署至生產,成功部署團隊獲通知,管道版本於 Git 中追蹤。
失敗時: 查測試覆蓋與失敗,驗 Prefect Cloud 憑證,確部署腳本處理錯誤,查 Slack webhook 配置,覽 CI 日誌中之部署錯誤。
驗證
- 編排框架已裝且運行
- 管道 DAG 已定義且依賴正確
- 所有任務按正確順序執行
- 重試邏輯於失敗時正常運作
- 排程運行按時執行
- MLflow 整合記錄實驗
- DVC 整合版本化資料
- 並行任務同步執行
- 條件分支正確運作
- 監測與告警運作
- CI/CD 管道自動部署
- 管道跨環境可重現
常見陷阱
- 循環依賴:任務 A 依 B,B 依 A——當細設 DAG 結構,用 Airflow/Prefect 驗證器
- 記憶體洩漏:長運行任務累積記憶體——設任務逾時,監測資源使用,定期重啟工作者
- XCom 大小上限:透過 XCom 傳大資料——用檔路徑或外部儲存(S3)而非直接序列化
- 時區混淆:排程於錯誤時間運行——恆用 UTC,於排程中明設時區
- 缺重試:任務於暫態錯誤上永久失敗——以指數退避配置重試
- 緊耦合:任務直接依實作細節——用清晰介面,明傳參數
- 無冪等:重運行任務致重複或錯誤——設計任務為冪等(重試安全)
- 錯誤處理差:失敗未提供有用上下文——加詳記錄,妥當捕獲例外
- 資源爭用:並行任務壓垮資源——限並發,設資源配額
- 版本衝突:不同任務需不相容之依賴——用 Docker 容器以隔離任務
相關技能
track-ml-experiments- 將 MLflow 追蹤整合於管道任務version-ml-data- 用 DVC 為管道中之資料版本化build-feature-store- 將特徵實體化為管道任務deploy-ml-model-serving- 加部署為最終管道階段deploy-to-kubernetes- 於 Kubernetes 上運行編排管道
GitHub 저장소
연관 스킬
llamaguard
기타LlamaGuard는 폭력 및 혐오 발언 등 6가지 안전 범주에서 LLM 입력과 출력을 조정하기 위한 Meta의 70-80억 파라미터 모델입니다. 94-95% 정확도를 제공하며 vLLM, Hugging Face 또는 Amazon SageMaker를 사용해 배포할 수 있습니다. 이 기술을 사용하여 AI 애플리케이션에 콘텐츠 필터링 및 안전 가드레일을 손쉽게 통합하세요.
cost-optimization
기타이 Claude Skill은 리소스 적정화, 태깅 전략, 지출 분석을 통해 개발자들이 클라우드 비용을 최적화할 수 있도록 지원합니다. AWS, Azure, GCP에서 클라우드 비용을 절감하고 비용 거버넌스를 구현하기 위한 프레임워크를 제공합니다. 인프라 비용을 분석하거나, 리소스를 적정화하거나, 예산 제약을 충족해야 할 때 사용하세요.
quantizing-models-bitsandbytes
기타이 스킬은 bitsandbytes를 사용하여 LLM을 8비트 또는 4비트 정밀도로 양자화하며, 최소한의 정확도 손실로 50-75%의 메모리 감소를 달성합니다. 제한된 GPU 메모리에서 더 큰 모델을 실행하거나 추론을 가속화하는 데 이상적이며, INT8, NF4, FP4와 같은 형식을 지원합니다. 이 스킬은 HuggingFace Transformers와 통합되어 QLoRA 학습 및 8비트 옵티마이저를 가능하게 합니다.
dispatching-parallel-agents
기타이 Claude Skill은 3개 이상의 독립적인 문제를 동시에 조사하고 해결하기 위해 다중 에이전트를 배치합니다. 공유 상태나 의존성 없이 해결 가능한 무관련 장애 시나리오에 맞게 설계되었습니다. 핵심 기능은 병렬 문제 해결로, 각 독립 문제 영역마다 하나의 에이전트를 할당하여 효율성을 극대화합니다.
