orchestrate-ml-pipeline
Über
Diese Fähigkeit orchestriert End-to-End-Machine-Learning-Pipelines mit Prefect oder Airflow, übernimmt DAG-Konstruktion, Task-Abhängigkeiten, Scheduling und Wiederholungslogik. Sie integriert ML-Tools wie MLflow und DVC für Produktions-Workflows. Nutzen Sie sie bei der Automatisierung mehrstufiger ML-Prozesse von Datenerfassung bis Deployment oder beim Management periodischer Modell-Neutrainings.
Schnellinstallation
Claude Code
Empfohlennpx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineKopieren Sie diesen Befehl und fügen Sie ihn in Claude Code ein, um diese Fähigkeit zu installieren
Dokumentation
Orchestrate ML Pipeline
See Extended Examples for complete configuration files and templates.
End-to-end ML pipelines: deps, scheduling, monitoring.
Use When
- Automate multi-step ML (ingest → deploy)
- Schedule periodic retraining
- Coord distributed data + training
- Complex stage deps
- Retry + failure recovery
- Monitor execution + alerting
- Orchestrate feature eng/train/eval/deploy
- Reproducible across envs
In
- Required: Pipeline components (ingest, preprocess, train, eval)
- Required: Framework (Prefect, Airflow, Kubeflow)
- Required: Python env w/ orchestration lib
- Optional: K8s cluster (distributed)
- Optional: MLflow tracking server
- Optional: DVC for data versioning
- Optional: Slack/email alerts
- Optional: Monitoring (Prometheus, Grafana)
Do
Step 1: Pick + install framework
Select + setup infra.
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Docker Compose for Airflow:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
→ Framework installed, UI accessible (Prefect http://localhost:4200, Airflow http://localhost:8080), DB initialized, scheduler running.
If err: ports (netstat -tulpn | grep 8080), DB conn, Redis for Celery, Python ≥3.8 for Airflow, Docker daemon for containerized, init logs.
Step 2: ML pipeline w/ Prefect
Flow w/ tasks per stage.
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
Deploy + schedule:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
→ Flow runs all tasks in order, retries auto on failure, success = green in UI, MLflow logs, model registered + deployed.
If err: deps defined? MLflow accessible? Data paths correct? Circular deps? Task timeout? Prefect logs. Resources (mem/CPU)?
Step 3: ML pipeline w/ Airflow
DAG for prod ML.
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
→ DAG in Airflow UI, scheduled runs on time, retries + alerts on failure, XCom passes between tasks, MLflow logs.
If err: syntax check (python dags/ml_training_dag.py), imports avail in env, XCom size (use file paths for big data), email config, scheduler running, task logs in UI.
Step 4: Advanced features
Dynamic DAGs, branching, parallel.
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflow branching:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
→ Parallel tasks concurrent (faster), conditional branches by logic, dynamic gen works, Dask distributes.
If err: Dask cluster configured + accessible? task_runner specified? Branches return valid task IDs? Resource contention w/ parallel? Conditional logic correct?
Step 5: Monitoring + alerting
Failure notifications + comprehensive monitoring.
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
Airflow monitoring w/ sensors:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
→ Slack/email on failures, SLA breach = alert, custom metrics tracked, logs aggregated.
If err: Slack webhook correct? SMTP set? Notification blocks loaded? SLA reasonable? Network blocking?
Step 6: CI/CD for pipelines
Version + automate.
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
→ Tests pass before deploy, auto deploy to prod, team notified on success, versioning in Git.
If err: test coverage + failures, Prefect Cloud creds, deploy script error handling, Slack webhook, CI logs.
Check
- Framework installed + running
- DAG defined w/ correct deps
- All tasks in proper order
- Retry logic works
- Scheduled runs on time
- MLflow logs experiments
- DVC versions data
- Parallel tasks concurrent
- Conditional branches work
- Monitoring + alerting functional
- CI/CD auto deploys
- Reproducible across envs
Traps
- Circular deps: A→B, B→A → design DAG carefully, use validators
- Memory leaks: long tasks accumulate → timeouts, monitor, restart workers
- XCom size limits: passing big data → use file paths/S3, not direct serial
- Timezone confusion: wrong times → always UTC, explicit tz in schedule
- Missing retries: transient = perm fail → exponential backoff retries
- Tight coupling: deps on impl details → clear interfaces, explicit params
- No idempotency: re-run = dupes/errors → design idempotent (safe retry)
- Poor error handling: no context → detailed logs, capture exceptions
- Resource contention: parallel overwhelms → limit concurrency, quotas
- Version conflicts: incompatible deps per task → Docker containers for isolation
→
track-ml-experiments— MLflow tracking in tasksversion-ml-data— DVC for data versioningbuild-feature-store— materialize features as taskdeploy-ml-model-serving— deploy as final stagedeploy-to-kubernetes— run pipelines on K8s
GitHub Repository
Verwandte Skills
llamaguard
AndereLlamaGuard ist Metas 7-8B-Parameter-Modell zur Moderation von LLM-Eingaben und -Ausgaben in sechs Sicherheitskategorien wie Gewalt und Hassrede. Es bietet eine Genauigkeit von 94-95 % und kann mit vLLM, Hugging Face oder Amazon SageMaker eingesetzt werden. Nutzen Sie diese Skill, um Inhaltsfilterung und Sicherheitsguardrails einfach in Ihre KI-Anwendungen zu integrieren.
cost-optimization
AndereDiese Claude Skill unterstützt Entwickler bei der Optimierung von Cloud-Kosten durch Ressourcen-Dimensionierung, Tagging-Strategien und Ausgabenanalysen. Sie bietet einen Rahmen zur Senkung von Cloud-Ausgaben und zur Implementierung von Kosten-Governance für AWS, Azure und GCP. Nutzen Sie sie, wenn Sie Infrastrukturkosten analysieren, Ressourcen richtig dimensionieren oder Budgetvorgaben einhalten müssen.
quantizing-models-bitsandbytes
AndereDiese Fähigkeit quantisiert LLMs auf 8-Bit- oder 4-Bit-Präzision mittels bitsandbytes und erreicht dabei eine Speicherreduzierung von 50–75 % bei minimalem Genauigkeitsverlust. Sie ist ideal für den Betrieb größerer Modelle mit begrenztem GPU-Speicher oder zur Beschleunigung von Inferenzvorgängen und unterstützt Formate wie INT8, NF4 und FP4. Die Fähigkeit integriert sich in HuggingFace Transformers und ermöglicht QLoRA-Training sowie 8-Bit-Optimierer.
dispatching-parallel-agents
AndereDiese Claude-Fähigkeit verteilt mehrere Agenten, um drei oder mehr unabhängige Probleme gleichzeitig zu untersuchen und zu beheben. Sie ist für Szenarien konzipiert, die unabhängige Fehler umfassen, die ohne gemeinsamen Zustand oder Abhängigkeiten gelöst werden können. Die Kernfähigkeit ist die parallele Problemlösung, bei der pro unabhängigem Problembereich ein Agent zugewiesen wird, um die Effizienz zu maximieren.
