orchestrate-ml-pipeline
关于
This skill orchestrates end-to-end machine learning pipelines using Prefect or Airflow, handling DAG construction, task dependencies, scheduling, and retry logic. It integrates with ML tools like MLflow and DVC for production workflows. Use it when automating multi-step ML processes from data ingestion to deployment or managing periodic model retraining.
快速安装
Claude Code
推荐npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipeline在 Claude Code 中复制并粘贴此命令以安装该技能
技能文档
Orchestrate ML Pipeline
See Extended Examples for complete configuration files and templates.
End-to-end ML pipelines: deps, scheduling, monitoring.
Use When
- Automate multi-step ML (ingest → deploy)
- Schedule periodic retraining
- Coord distributed data + training
- Complex stage deps
- Retry + failure recovery
- Monitor execution + alerting
- Orchestrate feature eng/train/eval/deploy
- Reproducible across envs
In
- Required: Pipeline components (ingest, preprocess, train, eval)
- Required: Framework (Prefect, Airflow, Kubeflow)
- Required: Python env w/ orchestration lib
- Optional: K8s cluster (distributed)
- Optional: MLflow tracking server
- Optional: DVC for data versioning
- Optional: Slack/email alerts
- Optional: Monitoring (Prometheus, Grafana)
Do
Step 1: Pick + install framework
Select + setup infra.
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Docker Compose for Airflow:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
→ Framework installed, UI accessible (Prefect http://localhost:4200, Airflow http://localhost:8080), DB initialized, scheduler running.
If err: ports (netstat -tulpn | grep 8080), DB conn, Redis for Celery, Python ≥3.8 for Airflow, Docker daemon for containerized, init logs.
Step 2: ML pipeline w/ Prefect
Flow w/ tasks per stage.
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
Deploy + schedule:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
→ Flow runs all tasks in order, retries auto on failure, success = green in UI, MLflow logs, model registered + deployed.
If err: deps defined? MLflow accessible? Data paths correct? Circular deps? Task timeout? Prefect logs. Resources (mem/CPU)?
Step 3: ML pipeline w/ Airflow
DAG for prod ML.
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
→ DAG in Airflow UI, scheduled runs on time, retries + alerts on failure, XCom passes between tasks, MLflow logs.
If err: syntax check (python dags/ml_training_dag.py), imports avail in env, XCom size (use file paths for big data), email config, scheduler running, task logs in UI.
Step 4: Advanced features
Dynamic DAGs, branching, parallel.
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflow branching:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
→ Parallel tasks concurrent (faster), conditional branches by logic, dynamic gen works, Dask distributes.
If err: Dask cluster configured + accessible? task_runner specified? Branches return valid task IDs? Resource contention w/ parallel? Conditional logic correct?
Step 5: Monitoring + alerting
Failure notifications + comprehensive monitoring.
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
Airflow monitoring w/ sensors:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
→ Slack/email on failures, SLA breach = alert, custom metrics tracked, logs aggregated.
If err: Slack webhook correct? SMTP set? Notification blocks loaded? SLA reasonable? Network blocking?
Step 6: CI/CD for pipelines
Version + automate.
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
→ Tests pass before deploy, auto deploy to prod, team notified on success, versioning in Git.
If err: test coverage + failures, Prefect Cloud creds, deploy script error handling, Slack webhook, CI logs.
Check
- Framework installed + running
- DAG defined w/ correct deps
- All tasks in proper order
- Retry logic works
- Scheduled runs on time
- MLflow logs experiments
- DVC versions data
- Parallel tasks concurrent
- Conditional branches work
- Monitoring + alerting functional
- CI/CD auto deploys
- Reproducible across envs
Traps
- Circular deps: A→B, B→A → design DAG carefully, use validators
- Memory leaks: long tasks accumulate → timeouts, monitor, restart workers
- XCom size limits: passing big data → use file paths/S3, not direct serial
- Timezone confusion: wrong times → always UTC, explicit tz in schedule
- Missing retries: transient = perm fail → exponential backoff retries
- Tight coupling: deps on impl details → clear interfaces, explicit params
- No idempotency: re-run = dupes/errors → design idempotent (safe retry)
- Poor error handling: no context → detailed logs, capture exceptions
- Resource contention: parallel overwhelms → limit concurrency, quotas
- Version conflicts: incompatible deps per task → Docker containers for isolation
→
track-ml-experiments— MLflow tracking in tasksversion-ml-data— DVC for data versioningbuild-feature-store— materialize features as taskdeploy-ml-model-serving— deploy as final stagedeploy-to-kubernetes— run pipelines on K8s
GitHub 仓库
相关推荐技能
llamaguard
其他LlamaGuard是Meta推出的7-8B参数内容审核模型,专门用于过滤LLM的输入和输出内容。它能检测六大安全风险类别(暴力/仇恨、性内容、武器、违禁品、自残、犯罪计划),准确率达94-95%。开发者可通过HuggingFace、vLLM或Sagemaker快速部署,并能与NeMo Guardrails集成实现自动化安全防护。
cost-optimization
其他这个Claude Skill帮助开发者优化云成本,通过资源调整、标记策略和预留实例来降低AWS、Azure和GCP的开支。它适用于减少云支出、分析基础设施成本或实施成本治理策略的场景。关键功能包括提供成本可视化、资源规模调整指导和定价模型优化建议。
quantizing-models-bitsandbytes
其他这个Skill使用bitsandbytes库量化大语言模型,能在GPU内存有限时通过8位或4位量化减少50-75%内存占用,同时保持精度损失最小。它支持INT8、NF4、FP4等多种量化格式,可与HuggingFace Transformers无缝集成,适用于需要部署更大模型或加速推理的场景。还提供QLoRA训练和8位优化器支持,让开发者能轻松实现高效模型压缩。
dispatching-parallel-agents
其他该Skill用于并行处理3个以上无依赖关系的独立故障,可为每个问题域分派专属Claude代理同时执行调查修复。它通过并发处理多个独立问题显著提升故障排查效率,特别适用于测试文件、子系统等无共享状态的场景。
