Zurück zu Fähigkeiten

orchestrate-ml-pipeline

pjt222
Aktualisiert Yesterday
10 Ansichten
17
2
17
Auf GitHub ansehen
Andereaiautomationdata

Über

Diese Fähigkeit orchestriert End-to-End-Maschinelles-Lernen-Pipelines mit Prefect oder Airflow, verwaltet DAG-Konstruktion, Aufgabenabhängigkeiten, Zeitplanung und Wiederholungslogik. Sie integriert mit MLflow, DVC und Feature-Stores für Produktions-Workflows. Nutzen Sie sie bei der Automatisierung mehrstufiger ML-Prozesse von Datenerfassung bis Bereitstellung oder beim Verwalten geplanter Nachschulungen und Fehlerbehebung.

Schnellinstallation

Claude Code

Empfohlen
Primär
npx skills add pjt222/agent-almanac -a claude-code
Plugin-BefehlAlternativ
/plugin add https://github.com/pjt222/agent-almanac
Git CloneAlternativ
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipeline

Kopieren Sie diesen Befehl und fügen Sie ihn in Claude Code ein, um diese Fähigkeit zu installieren

Dokumentation

編 ML 管道

詳例見 Extended Examples

以依管、排程、與察建並編端至端 ML 管道。

用時

  • 自動多步 ML 流自據入至部署乃用
  • 新據上排定期模型重訓乃用
  • 分布據處與訓任務之協調乃用
  • 行 ML 管階間複依乃用
  • 重試邏輯與失敗復管乃用
  • 察管道行與失敗警乃用
  • 編特徵工、訓、評、部之諸階乃用
  • 跨環境建可復 ML 流乃用

  • 必要:ML 管件(據入、預處、訓、評)
  • 必要:編框選(Prefect、Airflow、Kubeflow)
  • 必要:裝編庫之 Python 環
  • 可選:Kubernetes 群為分布行
  • 可選:MLflow 追蹤服以記實驗
  • 可選:DVC 為據版本
  • 可選:Slack/郵以警
  • 可選:察基(Prometheus、Grafana)

第一步:選並裝編框

擇宜框並設基設。

# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker

# Start Prefect server (local development)
prefect server start

# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)

Airflow 之 Docker Compose:

# docker-compose.airflow.yml
version: '3.8'

x-airflow-common: &airflow-common
  image: apache/airflow:2.8.0
  environment:
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)

得: 編框已裝,網 UI 可達(Prefect 於 http://localhost:4200,Airflowhttp://localhost:8080),庫已初,排程行。

敗則: 察端可用(netstat -tulpn | grep 8080),驗庫連,確 Celery 之 Redis 行,察 Python 版合(Airflow 須 ≥3.8),驗容 Docker 守護,閱日察初誤。

第二步:以 Prefect 建 ML 管道

建 Prefect flow,各階為 task。

# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)

部並排:

# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline

# Create deployment with schedule
deployment = Deployment.build_from_flow(
    flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)

得: Prefect flow 諸 task 依序行,task 失敗自重試,成行於 UI 現綠,MLflow 記實驗,模型已註並部。

敗則: 察 task 依正定,驗 MLflow 服可達,確據源徑正,察循依,驗 task 超時,閱 Prefect 日誌詳誤,察資(記/CPU)。

第三步:以 Airflow 建 ML 管道

建 Airflow DAG 為生產 ML 流。

# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)

得: DAG 現於 Airflow UI,排行按時,task 失敗觸重試與警,XCom 傳據於 task 間,MLflow 整合記實驗。

敗則: 察 DAG 文語法(python dags/ml_training_dag.py),驗入於 Airflow 環可得,確 XCom 不逾大限(大據用文徑),察郵設供警,驗排程行,閱 Airflow UI 中 task 日。

第四步:行進階能

加動 DAG、分支、平行行。

# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time

@task
def process_shard(shard_id: int, data: list) -> dict:
    """Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)

Airflow 分支:

# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator

def check_data_quality(**context):
    """Decide which branch to take."""
    data_path = context['ti'].xcom_pull(key='data_path')
    df = pd.read_csv(data_path)

# ... (see EXAMPLES.md for complete implementation)

得: 平行 task 並發行(管道更快),條件分支依邏行,動 task 生行,Dask 群分工。

敗則: 察 Dask 群已設可達,驗 task_runner 已指,確分支返有效 task ID,察平行 task 之資爭,驗條件邏正。

第五步:合察與警

加全察與失敗通。

# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext

@task(on_failure=[notify_failure])
def critical_task():
    """Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)

Airflow 之 sensor 察:

# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

default_args = {
    'sla': timedelta(hours=4),  # Alert if task exceeds 4 hours
    'on_failure_callback': slack_alert_failure,
    'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)

得: Slack/郵通於失敗送,SLA 違觸警,自度量追,日誌聚於察系。

敗則: 驗 Slack webhook 正設,察郵 SMTP 設,確通塊已正載,驗 SLA 值合理,察阻通之網問。

第六步:行管道之 CI/CD

版控並自動管道部。

# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline

on:
  push:
    branches: [main]
    paths:
      - 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)

得: 部前管試過,自動部生產,成部告隊,管版於 Git 追。

敗則: 察試覆與失敗,驗 Prefect Cloud 憑,確部腳本處誤,察 Slack webhook 設,閱 CI 日察部誤。

  • 編框已裝且行
  • 管道 DAG 定,依正
  • 諸 task 依正序行
  • 失敗時重試邏輯正
  • 排行按時行
  • MLflow 整合記實驗
  • DVC 整合記據版本
  • 平行 task 並發行
  • 條件分支正
  • 察與警可行
  • CI/CD 自動部
  • 管道跨環可復

  • 循依:A 依 B,B 依 A——慎設 DAG 構,用 Airflow/Prefect 驗器
  • 記漏:長 task 累記——設 task 超時,察資用,定期重啟工
  • XCom 大限:以 XCom 傳大據——用文徑或外存(S3)非直序
  • 時區混:誤時排——必用 UTC,明設排程時區
  • 無重試:暫誤致 task 永敗——設指數退之重試
  • 緊耦:task 直依實現——用清接,明傳參
  • 無冪等:重行致重複或誤——設冪等之 task(重試安)
  • 誤處差:失敗無有用脈——加詳日誌,正捕異
  • 資爭:平行 task 壓資——限並發,設資配額
  • 版衝:諸 task 須不容依——用 Docker 容隔離

  • track-ml-experiments - 合 MLflow 追於管道 task
  • version-ml-data - 用 DVC 為管中據版本
  • build-feature-store - 物化特徵為管 task
  • deploy-ml-model-serving - 加部為終管階
  • deploy-to-kubernetes - 於 Kubernetes 行編管道

GitHub Repository

pjt222/agent-almanac
Pfad: i18n/wenyan/skills/orchestrate-ml-pipeline
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

Verwandte Skills

llamaguard

Andere

LlamaGuard ist Metas 7-8B-Parameter-Modell zur Moderation von LLM-Eingaben und -Ausgaben in sechs Sicherheitskategorien wie Gewalt und Hassrede. Es bietet eine Genauigkeit von 94-95 % und kann mit vLLM, Hugging Face oder Amazon SageMaker eingesetzt werden. Nutzen Sie diese Skill, um Inhaltsfilterung und Sicherheitsguardrails einfach in Ihre KI-Anwendungen zu integrieren.

Skill ansehen

cost-optimization

Andere

Diese Claude Skill unterstützt Entwickler bei der Optimierung von Cloud-Kosten durch Ressourcen-Dimensionierung, Tagging-Strategien und Ausgabenanalysen. Sie bietet einen Rahmen zur Senkung von Cloud-Ausgaben und zur Implementierung von Kosten-Governance für AWS, Azure und GCP. Nutzen Sie sie, wenn Sie Infrastrukturkosten analysieren, Ressourcen richtig dimensionieren oder Budgetvorgaben einhalten müssen.

Skill ansehen

quantizing-models-bitsandbytes

Andere

Diese Fähigkeit quantisiert LLMs auf 8-Bit- oder 4-Bit-Präzision mittels bitsandbytes und erreicht dabei eine Speicherreduzierung von 50–75 % bei minimalem Genauigkeitsverlust. Sie ist ideal für den Betrieb größerer Modelle mit begrenztem GPU-Speicher oder zur Beschleunigung von Inferenzvorgängen und unterstützt Formate wie INT8, NF4 und FP4. Die Fähigkeit integriert sich in HuggingFace Transformers und ermöglicht QLoRA-Training sowie 8-Bit-Optimierer.

Skill ansehen

dispatching-parallel-agents

Andere

Diese Claude-Fähigkeit verteilt mehrere Agenten, um drei oder mehr unabhängige Probleme gleichzeitig zu untersuchen und zu beheben. Sie ist für Szenarien konzipiert, die unabhängige Fehler umfassen, die ohne gemeinsamen Zustand oder Abhängigkeiten gelöst werden können. Die Kernfähigkeit ist die parallele Problemlösung, bei der pro unabhängigem Problembereich ein Agent zugewiesen wird, um die Effizienz zu maximieren.

Skill ansehen