返回技能列表

orchestrate-ml-pipeline

pjt222
更新于 6 days ago
22 次查看
17
2
17
在 GitHub 上查看
其他aiautomationdata

关于

This skill orchestrates end-to-end machine learning pipelines using Prefect or Airflow, handling DAG construction, task dependencies, scheduling, and retry logic. It integrates with MLflow, DVC, and feature stores for production workflows. Use it when automating multi-step ML processes from data ingestion to deployment or scheduling periodic model retraining.

快速安装

Claude Code

推荐
主要方式
npx skills add pjt222/agent-almanac -a claude-code
插件命令备选方式
/plugin add https://github.com/pjt222/agent-almanac
Git 克隆备选方式
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipeline

在 Claude Code 中复制并粘贴此命令以安装该技能

技能文档

編 ML 線

詳例見 Extended Examples

立全 ML 線—依管、排、察。

  • 自由攝至部之多步 ML 流
  • 排期重訓
  • 協分布處+訓
  • 立 ML 線階間之繁依
  • 管重試+敗復
  • 察線行+敗警
  • 編特工、訓、評、部
  • 立可重之 ML 流跨境

  • :ML 線件(攝、預處、訓、評)
  • :編框擇(Prefect、Airflow、Kubeflow)
  • :Python+編庫
  • :Kubernetes 群為分布
  • :MLflow 跡伺
  • :DVC 為料版
  • :Slack/郵警
  • :察基(Prometheus、Grafana)

一:擇裝編框

擇宜框、立基。

# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker

# Start Prefect server (local development)
prefect server start

# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)

Airflow 之 Docker Compose:

# docker-compose.airflow.yml
version: '3.8'

x-airflow-common: &airflow-common
  image: apache/airflow:2.8.0
  environment:
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)

得: 框裝、UI 達(Prefect http://localhost:4200、Airflow http://localhost:8080)、庫初、排器行。

敗: 查埠(netstat -tulpn | grep 8080)、驗庫連、確 Redis 行(Celery 須)、Python 版(Airflow 須 ≥ 3.8)、Docker daemon、看初日誌。

二:以 Prefect 立 ML 線

各階為 task 之 flow:

# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)

部+排:

# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline

# Create deployment with schedule
deployment = Deployment.build_from_flow(
    flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)

得: Prefect flow 序行諸 task、敗自重試、成則 UI 綠、MLflow 錄驗、模註+部。

敗: 查依正確、MLflow 達、料路正、無循依、task 限期、Prefect 日誌、資源(記憶/CPU)。

三:以 Airflow 立 ML 線

立生產 DAG:

# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)

得: DAG 現於 UI、排行於時、敗觸試+警、XCom 傳料、MLflow 錄驗。

敗: 查 DAG 文法(python dags/ml_training_dag.py)、Airflow 環有引、XCom 不超限(大料用文路)、郵配置、排器行、UI 看 task 日誌。

四:施進階

加動 DAG、分支、並行:

# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time

@task
def process_shard(shard_id: int, data: list) -> dict:
    """Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)

Airflow 分支:

# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator

def check_data_quality(**context):
    """Decide which branch to take."""
    data_path = context['ti'].xcom_pull(key='data_path')
    df = pd.read_csv(data_path)

# ... (see EXAMPLES.md for complete implementation)

得: 並行 task 同行(線速)、條件分支按理執、動 task 生效、Dask 群分。

敗: Dask 群已配且達、task_runner 已指、分支返有效 task ID、並行無資源爭、條件邏輯正。

五:整察+警

加全察+敗報:

# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext

@task(on_failure=[notify_failure])
def critical_task():
    """Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)

Airflow 之 sensor 察:

# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

default_args = {
    'sla': timedelta(hours=4),  # Alert if task exceeds 4 hours
    'on_failure_callback': slack_alert_failure,
    'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)

得: 敗送 Slack/郵、SLA 違觸警、自指追、日誌聚於察系。

敗: 驗 Slack webhook 正、郵 SMTP 設、警塊已載、SLA 值合理、無網阻警。

六:施線之 CI/CD

控版+自部署:

# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)

得: 部前試過、自部於生產、部成告隊、Git 控線版。

敗: 試覆+敗、Prefect Cloud 憑證、部腳本處誤、Slack webhook、CI 日誌。

  • 編框裝且行
  • 線 DAG 立、依正
  • 諸 task 序行
  • 重試於敗起作用
  • 排運於時
  • MLflow 整錄驗
  • DVC 整版料
  • 並行 task 同行
  • 條件分支正
  • 察+警有效
  • CI/CD 自部
  • 線跨境可重

  • 循依:A 依 B、B 依 A—慎設 DAG、用 Airflow/Prefect 驗器
  • 記憶漏:長 task 累記憶—設限期、察用、期重啟工
  • XCom 限:傳大料於 XCom—用文路或外存(S3)
  • 時區惑:排運於誤時—皆用 UTC、明設時區
  • 缺重試:暫誤致 task 永敗—配指數退之重試
  • 緊耦:task 直依實作—用清介、明傳參
  • 無冪等:重行致重複或誤—設冪等(重試安)
  • 誤理弱:敗無境—加詳日誌、捕例
  • 資源爭:並行壓資源—限並、設配額
  • 版衝突:諸 task 須不容依—用 Docker 隔

  • track-ml-experiments
  • version-ml-data
  • build-feature-store
  • deploy-ml-model-serving
  • deploy-to-kubernetes

GitHub 仓库

pjt222/agent-almanac
路径: i18n/wenyan-ultra/skills/orchestrate-ml-pipeline
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

相关推荐技能

llamaguard

其他

LlamaGuard是Meta推出的7-8B参数内容审核模型,专门用于过滤LLM的输入和输出内容。它能检测六大安全风险类别(暴力/仇恨、性内容、武器、违禁品、自残、犯罪计划),准确率达94-95%。开发者可通过HuggingFace、vLLM或Sagemaker快速部署,并能与NeMo Guardrails集成实现自动化安全防护。

查看技能

cost-optimization

其他

这个Claude Skill帮助开发者优化云成本,通过资源调整、标记策略和预留实例来降低AWS、Azure和GCP的开支。它适用于减少云支出、分析基础设施成本或实施成本治理策略的场景。关键功能包括提供成本可视化、资源规模调整指导和定价模型优化建议。

查看技能

quantizing-models-bitsandbytes

其他

这个Skill使用bitsandbytes库量化大语言模型,能在GPU内存有限时通过8位或4位量化减少50-75%内存占用,同时保持精度损失最小。它支持INT8、NF4、FP4等多种量化格式,可与HuggingFace Transformers无缝集成,适用于需要部署更大模型或加速推理的场景。还提供QLoRA训练和8位优化器支持,让开发者能轻松实现高效模型压缩。

查看技能

dispatching-parallel-agents

其他

该Skill用于并行处理3个以上无依赖关系的独立故障,可为每个问题域分派专属Claude代理同时执行调查修复。它通过并发处理多个独立问题显著提升故障排查效率,特别适用于测试文件、子系统等无共享状态的场景。

查看技能