orchestrate-ml-pipeline
О программе
Этот навык организует сквозные ML-пайплайны с использованием Prefect или Airflow, управляя построением DAG, зависимостями задач, логикой повторов, планированием и мониторингом. Он интегрируется с MLflow, DVC и хранилищами признаков для рабочих процессов в продакшене. Используйте его для автоматизации многоэтапных процессов от приёма данных до развёртывания, планирования переобучения моделей, координации распределённых задач и управления восстановлением после сбоев.
Быстрая установка
Claude Code
Рекомендуетсяnpx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineСкопируйте и вставьте эту команду в Claude Code для установки этого навыка
Документация
MLパイプラインのオーケストレーション
完全な設定ファイルとテンプレートについてはExtended Examplesを参照。
依存関係管理、スケジューリング、モニタリングを備えたエンドツーエンドの機械学習パイプラインを構築しオーケストレーションする。
使用タイミング
- データ取り込みからデプロイまでの複数ステップのMLワークフローを自動化する時
- 新しいデータでの定期的なモデル再トレーニングをスケジュールする時
- 分散データ処理とトレーニングタスクを調整する時
- MLパイプラインステージ間の複雑な依存関係を実装する時
- リトライロジックと障害復旧を管理する時
- パイプライン実行のモニタリングと障害時のアラートを行う時
- 特徴エンジニアリング、トレーニング、評価、デプロイをオーケストレーションする時
- 環境間で再現可能なMLワークフローを構築する時
入力
- 必須: MLパイプラインコンポーネント(データ取り込み、前処理、トレーニング、評価)
- 必須: オーケストレーションフレームワークの選択(Prefect、Airflow、Kubeflow)
- 必須: オーケストレーションライブラリがインストールされたPython環境
- 任意: 分散実行用のKubernetesクラスター
- 任意: 実験ログ用のMLflowトラッキングサーバー
- 任意: データバージョニング用のDVC
- 任意: アラート用のSlack/メール
- 任意: モニタリングインフラストラクチャ(Prometheus、Grafana)
手順
ステップ1: オーケストレーションフレームワークの選択とインストール
適切なフレームワークを選択し、インフラストラクチャをセットアップする。
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Airflow用のDocker Compose:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
期待結果: オーケストレーションフレームワークがインストールされ、Web UI(Prefectは http://localhost:4200、Airflowは http://localhost:8080)にアクセスでき、データベースが初期化され、スケジューラーが実行されていること。
失敗時: ポートの利用可能性を確認する(netstat -tulpn | grep 8080)、データベース接続を確認する、CeleryにRedisが実行中であることを確認する、Pythonバージョンの互換性を確認する(Airflowは≥3.8が必要)、コンテナ化セットアップ用にDockerデーモンを確認する、初期化エラーのログを検査する。
ステップ2: PrefectでMLパイプラインを構築する
各パイプラインステージのタスクを持つPrefectフローを作成する。
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
デプロイとスケジュール:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
期待結果: Prefectフローがすべてのタスクを正しい順序で実行し、タスクの失敗が自動的にリトライをトリガーし、成功した実行がUIでグリーンに表示され、MLflowが実験をログし、モデルが登録されデプロイされること。
失敗時: タスク依存関係が正しく定義されているか確認する、MLflowサーバーにアクセスできるか確認する、データソースパスが正しいか確認する、循環依存関係がないか確認する、タスクタイムアウト制限を確認する、詳細なエラーのためにPrefectログを検査する、リソースの利用可能性(メモリ/CPU)を確認する。
ステップ3: AirflowでMLパイプラインを構築する
本番MLワークフロー用のAirflow DAGを作成する。
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
期待結果: DAGがAirflow UIに表示され、スケジュールされた実行が時間通りに実行され、タスクの失敗がリトライとアラートをトリガーし、XComがタスク間でデータを渡し、MLflow統合が実験をログすること。
失敗時: DAGファイルの構文を確認する(python dags/ml_training_dag.py)、Airflow環境でインポートが利用可能か確認する、XComがサイズ制限を超えていないか確認する(大きなデータにはファイルパスを使用する)、アラートのメール設定を確認する、スケジューラーが実行中か確認する、Airflow UIでタスクログを検査する。
ステップ4: 高度な機能の実装
動的DAG、分岐、並列実行を追加する。
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflowの分岐:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
期待結果: 並列タスクが同時に実行される(より高速なパイプライン)、条件分岐がロジックに基づいて実行される、動的タスク生成が機能する、Daskクラスターが作業を分散すること。
失敗時: Daskクラスターが設定されアクセス可能か確認する、task_runnerが指定されているか確認する、分岐が有効なタスクIDを返すか確認する、並列タスクのリソース競合を確認する、条件ロジックの正確性を確認する。
ステップ5: モニタリングとアラートの統合
包括的なモニタリングと障害通知を追加する。
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
センサーを使用したAirflowモニタリング:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
期待結果: 障害時にSlack/メール通知が送信される、SLA違反がアラートをトリガーする、カスタムメトリクスが追跡される、ログがモニタリングシステムに集約されること。
失敗時: Slack Webhookが正しく設定されているか確認する、メールSMTP設定を確認する、通知ブロックが適切にロードされているか確認する、SLA値が合理的か確認する、通知をブロックしているネットワーク問題がないか確認する。
ステップ6: パイプライン用CI/CDの実装
パイプラインデプロイメントのバージョン管理と自動化を行う。
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
期待結果: デプロイ前にパイプラインテストが合格する、本番への自動デプロイ、成功したデプロイ時にチームに通知、パイプラインのバージョニングがGitで追跡されること。
失敗時: テストカバレッジと失敗を確認する、Prefect Cloudの認証情報を確認する、デプロイスクリプトがエラーを処理しているか確認する、Slack Webhook設定を確認する、デプロイエラーのCIログを検査する。
バリデーション
- オーケストレーションフレームワークがインストールされ実行されている
- パイプラインDAGが正しい依存関係で定義されている
- すべてのタスクが適切な順序で実行される
- 障害時にリトライロジックが正しく機能する
- スケジュールされた実行が時間通りに実行される
- MLflow統合が実験をログする
- DVC統合がデータをバージョン管理する
- 並列タスクが同時に実行される
- 条件分岐が正しく動作する
- モニタリングとアラートが機能している
- CI/CDパイプラインが自動的にデプロイする
- パイプラインが環境間で再現可能である
よくある落とし穴
- 循環依存関係: タスクAがBに依存し、BがAに依存する — DAG構造を慎重に設計し、Airflow/Prefectのバリデーターを使用する
- メモリリーク: 長時間実行されるタスクがメモリを蓄積する — タスクタイムアウトを設定し、リソース使用量をモニタリングし、ワーカーを定期的に再起動する
- XComサイズ制限: XCom経由で大きなデータを渡す — 直接シリアライゼーションの代わりにファイルパスまたは外部ストレージ(S3)を使用する
- タイムゾーンの混乱: 間違った時間に実行がスケジュールされる — 常にUTCを使用し、スケジュールにタイムゾーンを明示的に設定する
- リトライの欠如: 一時的なエラーでタスクが永続的に失敗する — 指数バックオフ付きのリトライを設定する
- 密結合: タスクが実装の詳細に直接依存する — 明確なインターフェースを使用し、パラメータを明示的に渡す
- 冪等性の欠如: タスクの再実行が重複やエラーを引き起こす — タスクを冪等に設計する(リトライしても安全)
- 不十分なエラー処理: 障害が有用なコンテキストを提供しない — 詳細なログを追加し、例外を適切にキャプチャする
- リソース競合: 並列タスクがリソースを圧迫する — 同時実行数を制限し、リソースクォータを設定する
- バージョン競合: 異なるタスクが互換性のない依存関係を必要とする — タスクの分離にDockerコンテナを使用する
関連スキル
track-ml-experiments— パイプラインタスクにMLflowトラッキングを統合するversion-ml-data— パイプラインでデータバージョニングにDVCを使用するbuild-feature-store— パイプラインタスクとして特徴を具現化するdeploy-ml-model-serving— 最終パイプラインステージとしてデプロイメントを追加するdeploy-to-kubernetes— Kubernetes上でオーケストレーションされたパイプラインを実行する
GitHub репозиторий
Похожие навыки
llamaguard
ДругоеLlamaGuard — это модель от Meta с 7–8 миллиардами параметров для модерации входных и выходных данных больших языковых моделей по шести категориям безопасности, таким как насилие и разжигание ненависти. Она обеспечивает точность 94–95% и может быть развернута с помощью vLLM, Hugging Face или Amazon SageMaker. Используйте этот навык, чтобы легко интегрировать фильтрацию контента и защитные механизмы в ваши ИИ-приложения.
cost-optimization
ДругоеЭтот навык Claude помогает разработчикам оптимизировать облачные расходы за счет правильного подбора ресурсов, стратегий тегирования и анализа затрат. Он предоставляет framework для сокращения облачных расходов и внедрения управления затратами в AWS, Azure и GCP. Используйте его, когда вам нужно проанализировать расходы на инфраструктуру, оптимизировать ресурсы или уложиться в бюджетные ограничения.
quantizing-models-bitsandbytes
ДругоеЭтот навык выполняет квантизацию LLM до 8-битной или 4-битной точности с использованием библиотеки bitsandbytes, обеспечивая сокращение использования памяти на 50-75% при минимальной потере точности. Он идеально подходит для запуска больших моделей при ограниченной памяти GPU или для ускорения вывода, поддерживая форматы INT8, NF4 и FP4. Навык интегрируется с HuggingFace Transformers и позволяет использовать обучение QLoRA и 8-битные оптимизаторы.
dispatching-parallel-agents
ДругоеЭтот навык Claude распределяет нескольких агентов для исследования и устранения трёх и более независимых проблем параллельно. Он предназначен для сценариев с несвязанными сбоями, которые можно устранить без общего состояния или зависимостей. Ключевая возможность — параллельное решение проблем, где за каждую независимую предметную область назначается отдельный агент для максимальной эффективности.
