orchestrate-ml-pipeline
정보
이 스킬은 Prefect나 Airflow를 사용하여 종단간 머신러닝 파이프라인을 조율하며, DAG 구성, 작업 의존성, 스케줄링 및 재시도 로직을 처리합니다. MLflow와 DVC 같은 ML 도구와 통합하여 프로덕션 워크플로를 지원합니다. 데이터 수집부터 배포까지의 다단계 ML 프로세스를 자동화하거나 주기적인 모델 재학습을 관리할 때 사용하세요.
빠른 설치
Claude Code
추천npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/orchestrate-ml-pipelineClaude Code에서 이 명령을 복사하여 붙여넣어 스킬을 설치하세요
문서
Orchestrate ML Pipeline
See Extended Examples for complete configuration files and templates.
End-to-end ML pipelines: deps, scheduling, monitoring.
Use When
- Automate multi-step ML (ingest → deploy)
- Schedule periodic retraining
- Coord distributed data + training
- Complex stage deps
- Retry + failure recovery
- Monitor execution + alerting
- Orchestrate feature eng/train/eval/deploy
- Reproducible across envs
In
- Required: Pipeline components (ingest, preprocess, train, eval)
- Required: Framework (Prefect, Airflow, Kubeflow)
- Required: Python env w/ orchestration lib
- Optional: K8s cluster (distributed)
- Optional: MLflow tracking server
- Optional: DVC for data versioning
- Optional: Slack/email alerts
- Optional: Monitoring (Prometheus, Grafana)
Do
Step 1: Pick + install framework
Select + setup infra.
# Option 1: Prefect (modern, Pythonic, simpler)
pip install prefect
pip install prefect-aws prefect-dask prefect-docker
# Start Prefect server (local development)
prefect server start
# Or use Prefect Cloud (managed)
# ... (see EXAMPLES.md for complete implementation)
Docker Compose for Airflow:
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common: &airflow-common
image: apache/airflow:2.8.0
environment:
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# ... (see EXAMPLES.md for complete implementation)
→ Framework installed, UI accessible (Prefect http://localhost:4200, Airflow http://localhost:8080), DB initialized, scheduler running.
If err: ports (netstat -tulpn | grep 8080), DB conn, Redis for Celery, Python ≥3.8 for Airflow, Docker daemon for containerized, init logs.
Step 2: ML pipeline w/ Prefect
Flow w/ tasks per stage.
# prefect_ml_pipeline.py
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import mlflow
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# ... (see EXAMPLES.md for complete implementation)
Deploy + schedule:
# deploy_prefect.py
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
from prefect_ml_pipeline import ml_training_pipeline
# Create deployment with schedule
deployment = Deployment.build_from_flow(
flow=ml_training_pipeline,
# ... (see EXAMPLES.md for complete implementation)
→ Flow runs all tasks in order, retries auto on failure, success = green in UI, MLflow logs, model registered + deployed.
If err: deps defined? MLflow accessible? Data paths correct? Circular deps? Task timeout? Prefect logs. Resources (mem/CPU)?
Step 3: ML pipeline w/ Airflow
DAG for prod ML.
# dags/ml_training_dag.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import mlflow
import pandas as pd
# ... (see EXAMPLES.md for complete implementation)
→ DAG in Airflow UI, scheduled runs on time, retries + alerts on failure, XCom passes between tasks, MLflow logs.
If err: syntax check (python dags/ml_training_dag.py), imports avail in env, XCom size (use file paths for big data), email config, scheduler running, task logs in UI.
Step 4: Advanced features
Dynamic DAGs, branching, parallel.
# advanced_pipeline.py (Prefect)
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, ConcurrentTaskRunner
import time
@task
def process_shard(shard_id: int, data: list) -> dict:
"""Process data shard in parallel."""
# ... (see EXAMPLES.md for complete implementation)
Airflow branching:
# Airflow branching with BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def check_data_quality(**context):
"""Decide which branch to take."""
data_path = context['ti'].xcom_pull(key='data_path')
df = pd.read_csv(data_path)
# ... (see EXAMPLES.md for complete implementation)
→ Parallel tasks concurrent (faster), conditional branches by logic, dynamic gen works, Dask distributes.
If err: Dask cluster configured + accessible? task_runner specified? Branches return valid task IDs? Resource contention w/ parallel? Conditional logic correct?
Step 5: Monitoring + alerting
Failure notifications + comprehensive monitoring.
# monitoring_integration.py
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task, get_run_logger
from prefect.context import FlowRunContext
@task(on_failure=[notify_failure])
def critical_task():
"""Task with failure notification."""
# ... (see EXAMPLES.md for complete implementation)
Airflow monitoring w/ sensors:
# Airflow SLA and monitoring
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
default_args = {
'sla': timedelta(hours=4), # Alert if task exceeds 4 hours
'on_failure_callback': slack_alert_failure,
'on_success_callback': slack_alert_success,
# ... (see EXAMPLES.md for complete implementation)
→ Slack/email on failures, SLA breach = alert, custom metrics tracked, logs aggregated.
If err: Slack webhook correct? SMTP set? Notification blocks loaded? SLA reasonable? Network blocking?
Step 6: CI/CD for pipelines
Version + automate.
# .github/workflows/deploy-pipeline.yml
name: Deploy ML Pipeline
on:
push:
branches: [main]
paths:
- 'pipelines/**'
# ... (see EXAMPLES.md for complete implementation)
→ Tests pass before deploy, auto deploy to prod, team notified on success, versioning in Git.
If err: test coverage + failures, Prefect Cloud creds, deploy script error handling, Slack webhook, CI logs.
Check
- Framework installed + running
- DAG defined w/ correct deps
- All tasks in proper order
- Retry logic works
- Scheduled runs on time
- MLflow logs experiments
- DVC versions data
- Parallel tasks concurrent
- Conditional branches work
- Monitoring + alerting functional
- CI/CD auto deploys
- Reproducible across envs
Traps
- Circular deps: A→B, B→A → design DAG carefully, use validators
- Memory leaks: long tasks accumulate → timeouts, monitor, restart workers
- XCom size limits: passing big data → use file paths/S3, not direct serial
- Timezone confusion: wrong times → always UTC, explicit tz in schedule
- Missing retries: transient = perm fail → exponential backoff retries
- Tight coupling: deps on impl details → clear interfaces, explicit params
- No idempotency: re-run = dupes/errors → design idempotent (safe retry)
- Poor error handling: no context → detailed logs, capture exceptions
- Resource contention: parallel overwhelms → limit concurrency, quotas
- Version conflicts: incompatible deps per task → Docker containers for isolation
→
track-ml-experiments— MLflow tracking in tasksversion-ml-data— DVC for data versioningbuild-feature-store— materialize features as taskdeploy-ml-model-serving— deploy as final stagedeploy-to-kubernetes— run pipelines on K8s
GitHub 저장소
연관 스킬
llamaguard
기타LlamaGuard는 폭력 및 혐오 발언 등 6가지 안전 범주에서 LLM 입력과 출력을 조정하기 위한 Meta의 70-80억 파라미터 모델입니다. 94-95% 정확도를 제공하며 vLLM, Hugging Face 또는 Amazon SageMaker를 사용해 배포할 수 있습니다. 이 기술을 사용하여 AI 애플리케이션에 콘텐츠 필터링 및 안전 가드레일을 손쉽게 통합하세요.
cost-optimization
기타이 Claude Skill은 리소스 적정화, 태깅 전략, 지출 분석을 통해 개발자들이 클라우드 비용을 최적화할 수 있도록 지원합니다. AWS, Azure, GCP에서 클라우드 비용을 절감하고 비용 거버넌스를 구현하기 위한 프레임워크를 제공합니다. 인프라 비용을 분석하거나, 리소스를 적정화하거나, 예산 제약을 충족해야 할 때 사용하세요.
quantizing-models-bitsandbytes
기타이 스킬은 bitsandbytes를 사용하여 LLM을 8비트 또는 4비트 정밀도로 양자화하며, 최소한의 정확도 손실로 50-75%의 메모리 감소를 달성합니다. 제한된 GPU 메모리에서 더 큰 모델을 실행하거나 추론을 가속화하는 데 이상적이며, INT8, NF4, FP4와 같은 형식을 지원합니다. 이 스킬은 HuggingFace Transformers와 통합되어 QLoRA 학습 및 8비트 옵티마이저를 가능하게 합니다.
dispatching-parallel-agents
기타이 Claude Skill은 3개 이상의 독립적인 문제를 동시에 조사하고 해결하기 위해 다중 에이전트를 배치합니다. 공유 상태나 의존성 없이 해결 가능한 무관련 장애 시나리오에 맞게 설계되었습니다. 핵심 기능은 병렬 문제 해결로, 각 독립 문제 영역마다 하나의 에이전트를 할당하여 효율성을 극대화합니다.
