orchestrate-ml-pipeline
À propos
Cette compétence orchestre des pipelines de ML de bout en bout en utilisant Prefect ou Airflow, gérant la construction de DAG, les dépendances des tâches, la logique de reprise, la planification et la surveillance. Elle s'intègre avec MLflow, DVC et les feature stores pour les workflows de production. Utilisez-la pour automatiser des workflows multi-étapes de l'ingestion des données au déploiement, planifier le réentraînement des modèles, coordonner des tâches distribuées et gérer la reprise après échec.
Installation rapide
Claude Code
Recommandénpx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineCopiez et collez cette commande dans Claude Code pour installer cette compétence
Documentation
MLパイプラインのオーケストレーション
完全な設定ファイルとテンプレートについてはExtended Examplesを参照。
依存関係管理、スケジューリング、モニタリングを備えたエンドツーエンドの機械学習パイプラインを構築しオーケストレーションする。
使用タイミング
- データ取り込みからデプロイまでの複数ステップのMLワークフローを自動化する時
- 新しいデータでの定期的なモデル再トレーニングをスケジュールする時
- 分散データ処理とトレーニングタスクを調整する時
- MLパイプラインステージ間の複雑な依存関係を実装する時
- リトライロジックと障害復旧を管理する時
- パイプライン実行のモニタリングと障害時のアラートを行う時
- 特徴エンジニアリング、トレーニング、評価、デプロイをオーケストレーションする時
- 環境間で再現可能なMLワークフローを構築する時
入力
- 必須: MLパイプラインコンポーネント(データ取り込み、前処理、トレーニング、評価)
- 必須: オーケストレーションフレームワークの選択(Prefect、Airflow、Kubeflow)
- 必須: オーケストレーションライブラリがインストールされたPython環境
- 任意: 分散実行用のKubernetesクラスター
- 任意: 実験ログ用のMLflowトラッキングサーバー
- 任意: データバージョニング用のDVC
- 任意: アラート用のSlack/メール
- 任意: モニタリングインフラストラクチャ(Prometheus、Grafana)
手順
ステップ1: オーケストレーションフレームワークの選択とインストール
適切なフレームワークを選択し、インフラストラクチャをセットアップする。
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Airflow用のDocker Compose:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
期待結果: オーケストレーションフレームワークがインストールされ、Web UI(Prefectは http://localhost:4200、Airflowは http://localhost:8080)にアクセスでき、データベースが初期化され、スケジューラーが実行されていること。
失敗時: ポートの利用可能性を確認する(netstat -tulpn | grep 8080)、データベース接続を確認する、CeleryにRedisが実行中であることを確認する、Pythonバージョンの互換性を確認する(Airflowは≥3.8が必要)、コンテナ化セットアップ用にDockerデーモンを確認する、初期化エラーのログを検査する。
ステップ2: PrefectでMLパイプラインを構築する
各パイプラインステージのタスクを持つPrefectフローを作成する。
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
デプロイとスケジュール:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
期待結果: Prefectフローがすべてのタスクを正しい順序で実行し、タスクの失敗が自動的にリトライをトリガーし、成功した実行がUIでグリーンに表示され、MLflowが実験をログし、モデルが登録されデプロイされること。
失敗時: タスク依存関係が正しく定義されているか確認する、MLflowサーバーにアクセスできるか確認する、データソースパスが正しいか確認する、循環依存関係がないか確認する、タスクタイムアウト制限を確認する、詳細なエラーのためにPrefectログを検査する、リソースの利用可能性(メモリ/CPU)を確認する。
ステップ3: AirflowでMLパイプラインを構築する
本番MLワークフロー用のAirflow DAGを作成する。
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
期待結果: DAGがAirflow UIに表示され、スケジュールされた実行が時間通りに実行され、タスクの失敗がリトライとアラートをトリガーし、XComがタスク間でデータを渡し、MLflow統合が実験をログすること。
失敗時: DAGファイルの構文を確認する(python dags/ml_training_dag.py)、Airflow環境でインポートが利用可能か確認する、XComがサイズ制限を超えていないか確認する(大きなデータにはファイルパスを使用する)、アラートのメール設定を確認する、スケジューラーが実行中か確認する、Airflow UIでタスクログを検査する。
ステップ4: 高度な機能の実装
動的DAG、分岐、並列実行を追加する。
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflowの分岐:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
期待結果: 並列タスクが同時に実行される(より高速なパイプライン)、条件分岐がロジックに基づいて実行される、動的タスク生成が機能する、Daskクラスターが作業を分散すること。
失敗時: Daskクラスターが設定されアクセス可能か確認する、task_runnerが指定されているか確認する、分岐が有効なタスクIDを返すか確認する、並列タスクのリソース競合を確認する、条件ロジックの正確性を確認する。
ステップ5: モニタリングとアラートの統合
包括的なモニタリングと障害通知を追加する。
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
センサーを使用したAirflowモニタリング:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
期待結果: 障害時にSlack/メール通知が送信される、SLA違反がアラートをトリガーする、カスタムメトリクスが追跡される、ログがモニタリングシステムに集約されること。
失敗時: Slack Webhookが正しく設定されているか確認する、メールSMTP設定を確認する、通知ブロックが適切にロードされているか確認する、SLA値が合理的か確認する、通知をブロックしているネットワーク問題がないか確認する。
ステップ6: パイプライン用CI/CDの実装
パイプラインデプロイメントのバージョン管理と自動化を行う。
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
期待結果: デプロイ前にパイプラインテストが合格する、本番への自動デプロイ、成功したデプロイ時にチームに通知、パイプラインのバージョニングがGitで追跡されること。
失敗時: テストカバレッジと失敗を確認する、Prefect Cloudの認証情報を確認する、デプロイスクリプトがエラーを処理しているか確認する、Slack Webhook設定を確認する、デプロイエラーのCIログを検査する。
バリデーション
- オーケストレーションフレームワークがインストールされ実行されている
- パイプラインDAGが正しい依存関係で定義されている
- すべてのタスクが適切な順序で実行される
- 障害時にリトライロジックが正しく機能する
- スケジュールされた実行が時間通りに実行される
- MLflow統合が実験をログする
- DVC統合がデータをバージョン管理する
- 並列タスクが同時に実行される
- 条件分岐が正しく動作する
- モニタリングとアラートが機能している
- CI/CDパイプラインが自動的にデプロイする
- パイプラインが環境間で再現可能である
よくある落とし穴
- 循環依存関係: タスクAがBに依存し、BがAに依存する — DAG構造を慎重に設計し、Airflow/Prefectのバリデーターを使用する
- メモリリーク: 長時間実行されるタスクがメモリを蓄積する — タスクタイムアウトを設定し、リソース使用量をモニタリングし、ワーカーを定期的に再起動する
- XComサイズ制限: XCom経由で大きなデータを渡す — 直接シリアライゼーションの代わりにファイルパスまたは外部ストレージ(S3)を使用する
- タイムゾーンの混乱: 間違った時間に実行がスケジュールされる — 常にUTCを使用し、スケジュールにタイムゾーンを明示的に設定する
- リトライの欠如: 一時的なエラーでタスクが永続的に失敗する — 指数バックオフ付きのリトライを設定する
- 密結合: タスクが実装の詳細に直接依存する — 明確なインターフェースを使用し、パラメータを明示的に渡す
- 冪等性の欠如: タスクの再実行が重複やエラーを引き起こす — タスクを冪等に設計する(リトライしても安全)
- 不十分なエラー処理: 障害が有用なコンテキストを提供しない — 詳細なログを追加し、例外を適切にキャプチャする
- リソース競合: 並列タスクがリソースを圧迫する — 同時実行数を制限し、リソースクォータを設定する
- バージョン競合: 異なるタスクが互換性のない依存関係を必要とする — タスクの分離にDockerコンテナを使用する
関連スキル
track-ml-experiments— パイプラインタスクにMLflowトラッキングを統合するversion-ml-data— パイプラインでデータバージョニングにDVCを使用するbuild-feature-store— パイプラインタスクとして特徴を具現化するdeploy-ml-model-serving— 最終パイプラインステージとしてデプロイメントを追加するdeploy-to-kubernetes— Kubernetes上でオーケストレーションされたパイプラインを実行する
Dépôt GitHub
Compétences associées
llamaguard
AutreLlamaGuard est le modèle de Meta, doté de 7 à 8 milliards de paramètres, conçu pour modérer les entrées et sorties des LLM selon six catégories de sécurité comme la violence et les discours haineux. Il offre une précision de 94 à 95 % et peut être déployé avec vLLM, Hugging Face ou Amazon SageMaker. Utilisez cette compétence pour intégrer facilement le filtrage de contenu et des garde-fous de sécurité dans vos applications d'IA.
cost-optimization
AutreCette compétence de Claude aide les développeurs à optimiser les coûts du cloud grâce au redimensionnement des ressources, aux stratégies d'étiquetage et à l'analyse des dépenses. Elle fournit un cadre pour réduire les dépenses cloud et mettre en œuvre une gouvernance des coûts sur AWS, Azure et GCP. Utilisez-la lorsque vous devez analyser les coûts d'infrastructure, redimensionner les ressources ou respecter des contraintes budgétaires.
quantizing-models-bitsandbytes
AutreCette compétence quantifie les LLMs en précision 8 bits ou 4 bits à l'aide de bitsandbytes, permettant une réduction de 50 à 75 % de la mémoire utilisée avec une perte de précision minime. Elle est idéale pour exécuter des modèles plus volumineux sur une mémoire GPU limitée ou pour accélérer l'inférence, prenant en charge des formats comme INT8, NF4 et FP4. La compétence s'intègre à HuggingFace Transformers et permet l'entraînement QLoRA ainsi que l'utilisation d'optimiseurs en 8 bits.
dispatching-parallel-agents
AutreCette compétence Claude déploie plusieurs agents pour enquêter et résoudre simultanément 3 problèmes indépendants ou plus. Elle est conçue pour des scénarios impliquant des défaillances non liées qui peuvent être résolues sans état partagé ni dépendances. La capacité fondamentale est la résolution de problèmes en parallèle, en assignant un agent par domaine problématique indépendant afin de maximiser l'efficacité.
