orchestrate-ml-pipeline
について
このスキルは、PrefectまたはAirflowを使用してエンドツーエンドのMLパイプラインをオーケストレーションし、DAG構築、タスク依存関係、リトライロジック、スケジューリング、監視を処理します。本番ワークフローでは、MLflow、DVC、およびフィーチャーストアと統合されます。データ取り込みからデプロイメントまでのマルチステップワークフローを自動化し、モデルの再トレーニングをスケジュールし、分散タスクを調整し、障害回復を管理するためにご利用ください。
クイックインストール
Claude Code
推奨npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineこのコマンドをClaude Codeにコピー&ペーストしてスキルをインストールします
ドキュメント
MLパイプラインのオーケストレーション
完全な設定ファイルとテンプレートについてはExtended Examplesを参照。
依存関係管理、スケジューリング、モニタリングを備えたエンドツーエンドの機械学習パイプラインを構築しオーケストレーションする。
使用タイミング
- データ取り込みからデプロイまでの複数ステップのMLワークフローを自動化する時
- 新しいデータでの定期的なモデル再トレーニングをスケジュールする時
- 分散データ処理とトレーニングタスクを調整する時
- MLパイプラインステージ間の複雑な依存関係を実装する時
- リトライロジックと障害復旧を管理する時
- パイプライン実行のモニタリングと障害時のアラートを行う時
- 特徴エンジニアリング、トレーニング、評価、デプロイをオーケストレーションする時
- 環境間で再現可能なMLワークフローを構築する時
入力
- 必須: MLパイプラインコンポーネント(データ取り込み、前処理、トレーニング、評価)
- 必須: オーケストレーションフレームワークの選択(Prefect、Airflow、Kubeflow)
- 必須: オーケストレーションライブラリがインストールされたPython環境
- 任意: 分散実行用のKubernetesクラスター
- 任意: 実験ログ用のMLflowトラッキングサーバー
- 任意: データバージョニング用のDVC
- 任意: アラート用のSlack/メール
- 任意: モニタリングインフラストラクチャ(Prometheus、Grafana)
手順
ステップ1: オーケストレーションフレームワークの選択とインストール
適切なフレームワークを選択し、インフラストラクチャをセットアップする。
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Airflow用のDocker Compose:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
期待結果: オーケストレーションフレームワークがインストールされ、Web UI(Prefectは http://localhost:4200、Airflowは http://localhost:8080)にアクセスでき、データベースが初期化され、スケジューラーが実行されていること。
失敗時: ポートの利用可能性を確認する(netstat -tulpn | grep 8080)、データベース接続を確認する、CeleryにRedisが実行中であることを確認する、Pythonバージョンの互換性を確認する(Airflowは≥3.8が必要)、コンテナ化セットアップ用にDockerデーモンを確認する、初期化エラーのログを検査する。
ステップ2: PrefectでMLパイプラインを構築する
各パイプラインステージのタスクを持つPrefectフローを作成する。
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
デプロイとスケジュール:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
期待結果: Prefectフローがすべてのタスクを正しい順序で実行し、タスクの失敗が自動的にリトライをトリガーし、成功した実行がUIでグリーンに表示され、MLflowが実験をログし、モデルが登録されデプロイされること。
失敗時: タスク依存関係が正しく定義されているか確認する、MLflowサーバーにアクセスできるか確認する、データソースパスが正しいか確認する、循環依存関係がないか確認する、タスクタイムアウト制限を確認する、詳細なエラーのためにPrefectログを検査する、リソースの利用可能性(メモリ/CPU)を確認する。
ステップ3: AirflowでMLパイプラインを構築する
本番MLワークフロー用のAirflow DAGを作成する。
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
期待結果: DAGがAirflow UIに表示され、スケジュールされた実行が時間通りに実行され、タスクの失敗がリトライとアラートをトリガーし、XComがタスク間でデータを渡し、MLflow統合が実験をログすること。
失敗時: DAGファイルの構文を確認する(python dags/ml_training_dag.py)、Airflow環境でインポートが利用可能か確認する、XComがサイズ制限を超えていないか確認する(大きなデータにはファイルパスを使用する)、アラートのメール設定を確認する、スケジューラーが実行中か確認する、Airflow UIでタスクログを検査する。
ステップ4: 高度な機能の実装
動的DAG、分岐、並列実行を追加する。
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflowの分岐:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
期待結果: 並列タスクが同時に実行される(より高速なパイプライン)、条件分岐がロジックに基づいて実行される、動的タスク生成が機能する、Daskクラスターが作業を分散すること。
失敗時: Daskクラスターが設定されアクセス可能か確認する、task_runnerが指定されているか確認する、分岐が有効なタスクIDを返すか確認する、並列タスクのリソース競合を確認する、条件ロジックの正確性を確認する。
ステップ5: モニタリングとアラートの統合
包括的なモニタリングと障害通知を追加する。
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
センサーを使用したAirflowモニタリング:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
期待結果: 障害時にSlack/メール通知が送信される、SLA違反がアラートをトリガーする、カスタムメトリクスが追跡される、ログがモニタリングシステムに集約されること。
失敗時: Slack Webhookが正しく設定されているか確認する、メールSMTP設定を確認する、通知ブロックが適切にロードされているか確認する、SLA値が合理的か確認する、通知をブロックしているネットワーク問題がないか確認する。
ステップ6: パイプライン用CI/CDの実装
パイプラインデプロイメントのバージョン管理と自動化を行う。
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
期待結果: デプロイ前にパイプラインテストが合格する、本番への自動デプロイ、成功したデプロイ時にチームに通知、パイプラインのバージョニングがGitで追跡されること。
失敗時: テストカバレッジと失敗を確認する、Prefect Cloudの認証情報を確認する、デプロイスクリプトがエラーを処理しているか確認する、Slack Webhook設定を確認する、デプロイエラーのCIログを検査する。
バリデーション
- オーケストレーションフレームワークがインストールされ実行されている
- パイプラインDAGが正しい依存関係で定義されている
- すべてのタスクが適切な順序で実行される
- 障害時にリトライロジックが正しく機能する
- スケジュールされた実行が時間通りに実行される
- MLflow統合が実験をログする
- DVC統合がデータをバージョン管理する
- 並列タスクが同時に実行される
- 条件分岐が正しく動作する
- モニタリングとアラートが機能している
- CI/CDパイプラインが自動的にデプロイする
- パイプラインが環境間で再現可能である
よくある落とし穴
- 循環依存関係: タスクAがBに依存し、BがAに依存する — DAG構造を慎重に設計し、Airflow/Prefectのバリデーターを使用する
- メモリリーク: 長時間実行されるタスクがメモリを蓄積する — タスクタイムアウトを設定し、リソース使用量をモニタリングし、ワーカーを定期的に再起動する
- XComサイズ制限: XCom経由で大きなデータを渡す — 直接シリアライゼーションの代わりにファイルパスまたは外部ストレージ(S3)を使用する
- タイムゾーンの混乱: 間違った時間に実行がスケジュールされる — 常にUTCを使用し、スケジュールにタイムゾーンを明示的に設定する
- リトライの欠如: 一時的なエラーでタスクが永続的に失敗する — 指数バックオフ付きのリトライを設定する
- 密結合: タスクが実装の詳細に直接依存する — 明確なインターフェースを使用し、パラメータを明示的に渡す
- 冪等性の欠如: タスクの再実行が重複やエラーを引き起こす — タスクを冪等に設計する(リトライしても安全)
- 不十分なエラー処理: 障害が有用なコンテキストを提供しない — 詳細なログを追加し、例外を適切にキャプチャする
- リソース競合: 並列タスクがリソースを圧迫する — 同時実行数を制限し、リソースクォータを設定する
- バージョン競合: 異なるタスクが互換性のない依存関係を必要とする — タスクの分離にDockerコンテナを使用する
関連スキル
track-ml-experiments— パイプラインタスクにMLflowトラッキングを統合するversion-ml-data— パイプラインでデータバージョニングにDVCを使用するbuild-feature-store— パイプラインタスクとして特徴を具現化するdeploy-ml-model-serving— 最終パイプラインステージとしてデプロイメントを追加するdeploy-to-kubernetes— Kubernetes上でオーケストレーションされたパイプラインを実行する
GitHub リポジトリ
関連スキル
llamaguard
その他LlamaGuardは、暴力やヘイトスピーチなど6つの安全性カテゴリーにおいて、LLMの入力と出力をモデレートするMetaの70-80億パラメータモデルです。94〜95%の精度を提供し、vLLM、Hugging Face、Amazon SageMakerを使用してデプロイ可能です。このスキルを使用して、AIアプリケーションにコンテンツフィルタリングと安全策を簡単に統合できます。
cost-optimization
その他このClaudeスキルは、リソースの適正サイジング、タグ付け戦略、支出分析を通じて、開発者がクラウドコストを最適化することを支援します。AWS、Azure、GCPにわたるクラウド支出の削減とコストガバナンスの実施のためのフレームワークを提供します。インフラコストの分析、リソースの適正サイジング、または予算制約への対応が必要な際にご利用ください。
quantizing-models-bitsandbytes
その他このスキルは、bitsandbytesを使用してLLMを8ビットまたは4ビット精度に量子化し、精度の低下を最小限に抑えつつ50〜75%のメモリ削減を実現します。限られたGPUメモリでより大規模なモデルを実行したり、推論を高速化するのに理想的で、INT8、NF4、FP4などのフォーマットをサポートしています。HuggingFace Transformersと統合され、QLoRAトレーニングや8ビットオプティマイザーを可能にします。
dispatching-parallel-agents
その他このClaudeスキルは、複数のエージェントを配備し、3つ以上の独立した問題を並行して調査・修正します。共有状態や依存関係がなく解決可能な、無関係な障害が発生するシナリオ向けに設計されています。中核となる機能は並列問題解決であり、効率を最大化するために独立した問題領域ごとに1つのエージェントを割り当てます。
