orchestrate-ml-pipeline
정보
이 스킬은 Prefect나 Airflow를 사용하여 엔드투엔드 ML 파이프라인을 조율하며, DAG 구성, 태스크 의존성, 재시도 로직, 스케줄링 및 모니터링을 처리합니다. MLflow, DVC, 피처 스토어와 통합되어 프로덕션 워크플로를 지원합니다. 데이터 수집부터 배포까지 다단계 워크플로 자동화, 모델 재학습 스케줄링, 분산 태스크 조정, 장애 복구 관리에 활용하세요.
빠른 설치
Claude Code
추천npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineClaude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요
문서
MLパイプラインのオーケストレーション
完全な設定ファイルとテンプレートについてはExtended Examplesを参照。
依存関係管理、スケジューリング、モニタリングを備えたエンドツーエンドの機械学習パイプラインを構築しオーケストレーションする。
使用タイミング
- データ取り込みからデプロイまでの複数ステップのMLワークフローを自動化する時
- 新しいデータでの定期的なモデル再トレーニングをスケジュールする時
- 分散データ処理とトレーニングタスクを調整する時
- MLパイプラインステージ間の複雑な依存関係を実装する時
- リトライロジックと障害復旧を管理する時
- パイプライン実行のモニタリングと障害時のアラートを行う時
- 特徴エンジニアリング、トレーニング、評価、デプロイをオーケストレーションする時
- 環境間で再現可能なMLワークフローを構築する時
入力
- 必須: MLパイプラインコンポーネント(データ取り込み、前処理、トレーニング、評価)
- 必須: オーケストレーションフレームワークの選択(Prefect、Airflow、Kubeflow)
- 必須: オーケストレーションライブラリがインストールされたPython環境
- 任意: 分散実行用のKubernetesクラスター
- 任意: 実験ログ用のMLflowトラッキングサーバー
- 任意: データバージョニング用のDVC
- 任意: アラート用のSlack/メール
- 任意: モニタリングインフラストラクチャ(Prometheus、Grafana)
手順
ステップ1: オーケストレーションフレームワークの選択とインストール
適切なフレームワークを選択し、インフラストラクチャをセットアップする。
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Airflow用のDocker Compose:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
期待結果: オーケストレーションフレームワークがインストールされ、Web UI(Prefectは http://localhost:4200、Airflowは http://localhost:8080)にアクセスでき、データベースが初期化され、スケジューラーが実行されていること。
失敗時: ポートの利用可能性を確認する(netstat -tulpn | grep 8080)、データベース接続を確認する、CeleryにRedisが実行中であることを確認する、Pythonバージョンの互換性を確認する(Airflowは≥3.8が必要)、コンテナ化セットアップ用にDockerデーモンを確認する、初期化エラーのログを検査する。
ステップ2: PrefectでMLパイプラインを構築する
各パイプラインステージのタスクを持つPrefectフローを作成する。
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
デプロイとスケジュール:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
期待結果: Prefectフローがすべてのタスクを正しい順序で実行し、タスクの失敗が自動的にリトライをトリガーし、成功した実行がUIでグリーンに表示され、MLflowが実験をログし、モデルが登録されデプロイされること。
失敗時: タスク依存関係が正しく定義されているか確認する、MLflowサーバーにアクセスできるか確認する、データソースパスが正しいか確認する、循環依存関係がないか確認する、タスクタイムアウト制限を確認する、詳細なエラーのためにPrefectログを検査する、リソースの利用可能性(メモリ/CPU)を確認する。
ステップ3: AirflowでMLパイプラインを構築する
本番MLワークフロー用のAirflow DAGを作成する。
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
期待結果: DAGがAirflow UIに表示され、スケジュールされた実行が時間通りに実行され、タスクの失敗がリトライとアラートをトリガーし、XComがタスク間でデータを渡し、MLflow統合が実験をログすること。
失敗時: DAGファイルの構文を確認する(python dags/ml_training_dag.py)、Airflow環境でインポートが利用可能か確認する、XComがサイズ制限を超えていないか確認する(大きなデータにはファイルパスを使用する)、アラートのメール設定を確認する、スケジューラーが実行中か確認する、Airflow UIでタスクログを検査する。
ステップ4: 高度な機能の実装
動的DAG、分岐、並列実行を追加する。
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflowの分岐:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
期待結果: 並列タスクが同時に実行される(より高速なパイプライン)、条件分岐がロジックに基づいて実行される、動的タスク生成が機能する、Daskクラスターが作業を分散すること。
失敗時: Daskクラスターが設定されアクセス可能か確認する、task_runnerが指定されているか確認する、分岐が有効なタスクIDを返すか確認する、並列タスクのリソース競合を確認する、条件ロジックの正確性を確認する。
ステップ5: モニタリングとアラートの統合
包括的なモニタリングと障害通知を追加する。
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
センサーを使用したAirflowモニタリング:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
期待結果: 障害時にSlack/メール通知が送信される、SLA違反がアラートをトリガーする、カスタムメトリクスが追跡される、ログがモニタリングシステムに集約されること。
失敗時: Slack Webhookが正しく設定されているか確認する、メールSMTP設定を確認する、通知ブロックが適切にロードされているか確認する、SLA値が合理的か確認する、通知をブロックしているネットワーク問題がないか確認する。
ステップ6: パイプライン用CI/CDの実装
パイプラインデプロイメントのバージョン管理と自動化を行う。
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
期待結果: デプロイ前にパイプラインテストが合格する、本番への自動デプロイ、成功したデプロイ時にチームに通知、パイプラインのバージョニングがGitで追跡されること。
失敗時: テストカバレッジと失敗を確認する、Prefect Cloudの認証情報を確認する、デプロイスクリプトがエラーを処理しているか確認する、Slack Webhook設定を確認する、デプロイエラーのCIログを検査する。
バリデーション
- オーケストレーションフレームワークがインストールされ実行されている
- パイプラインDAGが正しい依存関係で定義されている
- すべてのタスクが適切な順序で実行される
- 障害時にリトライロジックが正しく機能する
- スケジュールされた実行が時間通りに実行される
- MLflow統合が実験をログする
- DVC統合がデータをバージョン管理する
- 並列タスクが同時に実行される
- 条件分岐が正しく動作する
- モニタリングとアラートが機能している
- CI/CDパイプラインが自動的にデプロイする
- パイプラインが環境間で再現可能である
よくある落とし穴
- 循環依存関係: タスクAがBに依存し、BがAに依存する — DAG構造を慎重に設計し、Airflow/Prefectのバリデーターを使用する
- メモリリーク: 長時間実行されるタスクがメモリを蓄積する — タスクタイムアウトを設定し、リソース使用量をモニタリングし、ワーカーを定期的に再起動する
- XComサイズ制限: XCom経由で大きなデータを渡す — 直接シリアライゼーションの代わりにファイルパスまたは外部ストレージ(S3)を使用する
- タイムゾーンの混乱: 間違った時間に実行がスケジュールされる — 常にUTCを使用し、スケジュールにタイムゾーンを明示的に設定する
- リトライの欠如: 一時的なエラーでタスクが永続的に失敗する — 指数バックオフ付きのリトライを設定する
- 密結合: タスクが実装の詳細に直接依存する — 明確なインターフェースを使用し、パラメータを明示的に渡す
- 冪等性の欠如: タスクの再実行が重複やエラーを引き起こす — タスクを冪等に設計する(リトライしても安全)
- 不十分なエラー処理: 障害が有用なコンテキストを提供しない — 詳細なログを追加し、例外を適切にキャプチャする
- リソース競合: 並列タスクがリソースを圧迫する — 同時実行数を制限し、リソースクォータを設定する
- バージョン競合: 異なるタスクが互換性のない依存関係を必要とする — タスクの分離にDockerコンテナを使用する
関連スキル
track-ml-experiments— パイプラインタスクにMLflowトラッキングを統合するversion-ml-data— パイプラインでデータバージョニングにDVCを使用するbuild-feature-store— パイプラインタスクとして特徴を具現化するdeploy-ml-model-serving— 最終パイプラインステージとしてデプロイメントを追加するdeploy-to-kubernetes— Kubernetes上でオーケストレーションされたパイプラインを実行する
GitHub 저장소
연관 스킬
llamaguard
기타LlamaGuard는 폭력 및 혐오 발언 등 6가지 안전 범주에서 LLM 입력과 출력을 조정하기 위한 Meta의 70-80억 파라미터 모델입니다. 94-95% 정확도를 제공하며 vLLM, Hugging Face 또는 Amazon SageMaker를 사용해 배포할 수 있습니다. 이 기술을 사용하여 AI 애플리케이션에 콘텐츠 필터링 및 안전 가드레일을 손쉽게 통합하세요.
cost-optimization
기타이 Claude Skill은 리소스 적정화, 태깅 전략, 지출 분석을 통해 개발자들이 클라우드 비용을 최적화할 수 있도록 지원합니다. AWS, Azure, GCP에서 클라우드 비용을 절감하고 비용 거버넌스를 구현하기 위한 프레임워크를 제공합니다. 인프라 비용을 분석하거나, 리소스를 적정화하거나, 예산 제약을 충족해야 할 때 사용하세요.
quantizing-models-bitsandbytes
기타이 스킬은 bitsandbytes를 사용하여 LLM을 8비트 또는 4비트 정밀도로 양자화하며, 최소한의 정확도 손실로 50-75%의 메모리 감소를 달성합니다. 제한된 GPU 메모리에서 더 큰 모델을 실행하거나 추론을 가속화하는 데 이상적이며, INT8, NF4, FP4와 같은 형식을 지원합니다. 이 스킬은 HuggingFace Transformers와 통합되어 QLoRA 학습 및 8비트 옵티마이저를 가능하게 합니다.
dispatching-parallel-agents
기타이 Claude Skill은 3개 이상의 독립적인 문제를 동시에 조사하고 해결하기 위해 다중 에이전트를 배치합니다. 공유 상태나 의존성 없이 해결 가능한 무관련 장애 시나리오에 맞게 설계되었습니다. 핵심 기능은 병렬 문제 해결로, 각 독립 문제 영역마다 하나의 에이전트를 할당하여 효율성을 극대화합니다.
