orchestrate-ml-pipeline
について
このスキルは、PrefectまたはAirflowを使用してエンドツーエンドの機械学習パイプラインを調整し、DAGの構築、タスク依存関係の管理、スケジューリング、およびリトライロジックを処理します。本番ワークフローではMLflow、DVC、およびフィーチャーストアと統合します。データ取り込みからデプロイメントまでの多段階MLプロセスの自動化、または定期的な再学習と障害回復の管理にご利用ください。
クイックインストール
Claude Code
推奨npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineこのコマンドをClaude Codeにコピー&ペーストしてスキルをインストールします
ドキュメント
編 ML 管道
詳例見 Extended Examples。
以依管、排程、與察建並編端至端 ML 管道。
用時
- 自動多步 ML 流自據入至部署乃用
- 新據上排定期模型重訓乃用
- 分布據處與訓任務之協調乃用
- 行 ML 管階間複依乃用
- 重試邏輯與失敗復管乃用
- 察管道行與失敗警乃用
- 編特徵工、訓、評、部之諸階乃用
- 跨環境建可復 ML 流乃用
入
- 必要:ML 管件(據入、預處、訓、評)
- 必要:編框選(Prefect、Airflow、Kubeflow)
- 必要:裝編庫之 Python 環
- 可選:Kubernetes 群為分布行
- 可選:MLflow 追蹤服以記實驗
- 可選:DVC 為據版本
- 可選:Slack/郵以警
- 可選:察基(Prometheus、Grafana)
法
第一步:選並裝編框
擇宜框並設基設。
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Airflow 之 Docker Compose:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
得: 編框已裝,網 UI 可達(Prefect 於 http://localhost:4200,Airflow 於 http://localhost:8080),庫已初,排程行。
敗則: 察端可用(netstat -tulpn | grep 8080),驗庫連,確 Celery 之 Redis 行,察 Python 版合(Airflow 須 ≥3.8),驗容 Docker 守護,閱日察初誤。
第二步:以 Prefect 建 ML 管道
建 Prefect flow,各階為 task。
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
部並排:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
得: Prefect flow 諸 task 依序行,task 失敗自重試,成行於 UI 現綠,MLflow 記實驗,模型已註並部。
敗則: 察 task 依正定,驗 MLflow 服可達,確據源徑正,察循依,驗 task 超時,閱 Prefect 日誌詳誤,察資(記/CPU)。
第三步:以 Airflow 建 ML 管道
建 Airflow DAG 為生產 ML 流。
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
得: DAG 現於 Airflow UI,排行按時,task 失敗觸重試與警,XCom 傳據於 task 間,MLflow 整合記實驗。
敗則: 察 DAG 文語法(python dags/ml_training_dag.py),驗入於 Airflow 環可得,確 XCom 不逾大限(大據用文徑),察郵設供警,驗排程行,閱 Airflow UI 中 task 日。
第四步:行進階能
加動 DAG、分支、平行行。
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflow 分支:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
得: 平行 task 並發行(管道更快),條件分支依邏行,動 task 生行,Dask 群分工。
敗則: 察 Dask 群已設可達,驗 task_runner 已指,確分支返有效 task ID,察平行 task 之資爭,驗條件邏正。
第五步:合察與警
加全察與失敗通。
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
Airflow 之 sensor 察:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
得: Slack/郵通於失敗送,SLA 違觸警,自度量追,日誌聚於察系。
敗則: 驗 Slack webhook 正設,察郵 SMTP 設,確通塊已正載,驗 SLA 值合理,察阻通之網問。
第六步:行管道之 CI/CD
版控並自動管道部。
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
得: 部前管試過,自動部生產,成部告隊,管版於 Git 追。
敗則: 察試覆與失敗,驗 Prefect Cloud 憑,確部腳本處誤,察 Slack webhook 設,閱 CI 日察部誤。
驗
- 編框已裝且行
- 管道 DAG 定,依正
- 諸 task 依正序行
- 失敗時重試邏輯正
- 排行按時行
- MLflow 整合記實驗
- DVC 整合記據版本
- 平行 task 並發行
- 條件分支正
- 察與警可行
- CI/CD 自動部
- 管道跨環可復
陷
- 循依:A 依 B,B 依 A——慎設 DAG 構,用 Airflow/Prefect 驗器
- 記漏:長 task 累記——設 task 超時,察資用,定期重啟工
- XCom 大限:以 XCom 傳大據——用文徑或外存(S3)非直序
- 時區混:誤時排——必用 UTC,明設排程時區
- 無重試:暫誤致 task 永敗——設指數退之重試
- 緊耦:task 直依實現——用清接,明傳參
- 無冪等:重行致重複或誤——設冪等之 task(重試安)
- 誤處差:失敗無有用脈——加詳日誌,正捕異
- 資爭:平行 task 壓資——限並發,設資配額
- 版衝:諸 task 須不容依——用 Docker 容隔離
參
track-ml-experiments- 合 MLflow 追於管道 taskversion-ml-data- 用 DVC 為管中據版本build-feature-store- 物化特徵為管 taskdeploy-ml-model-serving- 加部為終管階deploy-to-kubernetes- 於 Kubernetes 行編管道
GitHub リポジトリ
関連スキル
llamaguard
その他LlamaGuardは、暴力やヘイトスピーチなど6つの安全性カテゴリーにおいて、LLMの入力と出力をモデレートするMetaの70-80億パラメータモデルです。94〜95%の精度を提供し、vLLM、Hugging Face、Amazon SageMakerを使用してデプロイ可能です。このスキルを使用して、AIアプリケーションにコンテンツフィルタリングと安全策を簡単に統合できます。
cost-optimization
その他このClaudeスキルは、リソースの適正サイジング、タグ付け戦略、支出分析を通じて、開発者がクラウドコストを最適化することを支援します。AWS、Azure、GCPにわたるクラウド支出の削減とコストガバナンスの実施のためのフレームワークを提供します。インフラコストの分析、リソースの適正サイジング、または予算制約への対応が必要な際にご利用ください。
quantizing-models-bitsandbytes
その他このスキルは、bitsandbytesを使用してLLMを8ビットまたは4ビット精度に量子化し、精度の低下を最小限に抑えつつ50〜75%のメモリ削減を実現します。限られたGPUメモリでより大規模なモデルを実行したり、推論を高速化するのに理想的で、INT8、NF4、FP4などのフォーマットをサポートしています。HuggingFace Transformersと統合され、QLoRAトレーニングや8ビットオプティマイザーを可能にします。
dispatching-parallel-agents
その他このClaudeスキルは、複数のエージェントを配備し、3つ以上の独立した問題を並行して調査・修正します。共有状態や依存関係がなく解決可能な、無関係な障害が発生するシナリオ向けに設計されています。中核となる機能は並列問題解決であり、効率を最大化するために独立した問題領域ごとに1つのエージェントを割り当てます。
