moai-domain-data-science
About
This Claude Skill provides production-grade data science capabilities using TensorFlow, PyTorch, and Scikit-learn. It handles end-to-end ML workflows from data processing and model development to deployment and statistical analysis. Use this skill for building complete data science solutions with comprehensive experimentation and visualization tools.
Quick Install
Claude Code
Recommended/plugin add https://github.com/modu-ai/moai-adkgit clone https://github.com/modu-ai/moai-adk.git ~/.claude/skills/moai-domain-data-scienceCopy and paste this command in Claude Code to install this skill
Documentation
Data Science & Analytics
Level 1: Quick Reference
Core Capabilities
- Data Processing: Pandas 2.2.0, NumPy 1.26.0, Dask 2024.1.0
- Machine Learning: TensorFlow 2.20.0, PyTorch 2.9.0, Scikit-learn 1.7.2
- Visualization: Matplotlib 3.8.0, Seaborn 0.13.0, Plotly 5.17.0
- Statistics: SciPy 1.12.0, Statsmodels 0.14.0, Pingouin 0.8.0
- Big Data: Spark 3.5.0, Polars 0.20.0, Apache Arrow 14.0.0
- Experimentation: MLflow 2.9.0, Weights & Biases, Neptune
Quick Setup Examples
# Data science workflow starter
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix
# Load and explore data
df = pd.read_csv('data.csv')
print(f"Dataset shape: {df.shape}")
print(f"Missing values: {df.isnull().sum().sum()}")
# Basic preprocessing
X = df.drop('target', axis=1)
y = df['target']
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
# Modern data visualization with Plotly
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
# Interactive scatter plot
fig = px.scatter(df, x='feature1', y='feature2', color='target',
size='feature3', hover_data=['feature4'])
fig.show()
# Multi-subplot dashboard
fig = make_subplots(
rows=2, cols=2,
subplot_titles=('Distribution', 'Correlation', 'Time Series', 'Feature Importance')
)
# Add traces...
fig.show()
Level 2: Practical Implementation
Data Processing & Analysis Pipeline
1. Advanced Data Preprocessing
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder, RobustScaler
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.feature_selection import SelectKBest, f_classif, RFE
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
class DataPreprocessor:
def __init__(self):
self.numeric_features = []
self.categorical_features = []
self.preprocessor = None
self.feature_selector = None
self.outlier_detector = None
def analyze_data(self, df: pd.DataFrame) -> dict:
"""Comprehensive data analysis"""
analysis = {
'shape': df.shape,
'dtypes': df.dtypes.to_dict(),
'missing_values': df.isnull().sum().to_dict(),
'missing_percentage': (df.isnull().sum() / len(df) * 100).to_dict(),
'duplicates': df.duplicated().sum(),
'numeric_summary': df.describe().to_dict(),
'categorical_summary': {},
'correlation_matrix': None
}
# Categorical features analysis
for col in df.select_dtypes(include=['object', 'category']).columns:
analysis['categorical_summary'][col] = {
'unique_count': df[col].nunique(),
'unique_values': df[col].unique().tolist()[:10], # First 10 values
'value_counts': df[col].value_counts().head().to_dict()
}
# Correlation matrix for numeric features
numeric_df = df.select_dtypes(include=[np.number])
if len(numeric_df.columns) > 1:
analysis['correlation_matrix'] = numeric_df.corr().to_dict()
return analysis
def detect_outliers(self, df: pd.DataFrame, method: str = 'iqr') -> pd.DataFrame:
"""Detect outliers using various methods"""
numeric_df = df.select_dtypes(include=[np.number])
outliers_info = {}
for col in numeric_df.columns:
if method == 'iqr':
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
elif method == 'zscore':
z_scores = np.abs(stats.zscore(df[col].dropna()))
outliers = df[z_scores > 3]
elif method == 'isolation_forest':
from sklearn.ensemble import IsolationForest
iso_forest = IsolationForest(contamination=0.1, random_state=42)
outlier_labels = iso_forest.fit_predict(df[[col]].dropna())
outliers = df[outlier_labels == -1]
outliers_info[col] = {
'count': len(outliers),
'percentage': len(outliers) / len(df) * 100,
'indices': outliers.index.tolist()
}
return outliers_info
def build_preprocessing_pipeline(self, X: pd.DataFrame,
handle_outliers: bool = True,
feature_selection: bool = True,
selection_method: str = 'k_best') -> Pipeline:
"""Build comprehensive preprocessing pipeline"""
# Identify feature types
self.numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
self.categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
# Numeric preprocessing
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', RobustScaler()) # More robust to outliers than StandardScaler
])
# Categorical preprocessing
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('encoder', LabelEncoder()) if len(self.categorical_features) == 1 else
('onehot', SimpleImputer(strategy='most_frequent'))
])
# Column transformer
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, self.numeric_features),
('cat', categorical_transformer, self.categorical_features)
])
pipeline_steps = [('preprocessor', preprocessor)]
# Feature selection
if feature_selection and selection_method == 'k_best':
selector = SelectKBest(score_func=f_classif, k=10)
pipeline_steps.append(('feature_selection', selector))
elif feature_selection and selection_method == 'rfe':
from sklearn.ensemble import RandomForestClassifier
estimator = RandomForestClassifier(n_estimators=100, random_state=42)
selector = RFE(estimator=estimator, n_features_to_select=10)
pipeline_steps.append(('feature_selection', selector))
self.preprocessor = Pipeline(pipeline_steps)
return self.preprocessor
def transform_data(self, X: pd.DataFrame, y: pd.Series = None) -> np.ndarray:
"""Transform data using preprocessing pipeline"""
if self.preprocessor is None:
self.build_preprocessing_pipeline(X)
if y is not None:
return self.preprocessor.fit_transform(X, y)
else:
return self.preprocessor.transform(X)
def get_feature_importance(self, X: pd.DataFrame, y: pd.Series) -> dict:
"""Get feature importance using various methods"""
# Prepare data
X_processed = self.transform_data(X, y)
feature_importance = {}
# Random Forest importance
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_processed, y)
# Get feature names (simplified for this example)
feature_names = [f"feature_{i}" for i in range(X_processed.shape[1])]
feature_importance['random_forest'] = dict(zip(feature_names, rf.feature_importances_))
# Mutual information
from sklearn.feature_selection import mutual_info_classif
mi_scores = mutual_info_classif(X_processed, y)
feature_importance['mutual_info'] = dict(zip(feature_names, mi_scores))
# ANOVA F-test
from sklearn.feature_selection import f_classif
f_scores, _ = f_classif(X_processed, y)
feature_importance['anova_f'] = dict(zip(feature_names, f_scores))
return feature_importance
def create_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Create engineered features"""
df_engineered = df.copy()
numeric_cols = df.select_dtypes(include=[np.number]).columns
# Polynomial features for top correlated columns
if len(numeric_cols) >= 2:
# Interaction features
for i, col1 in enumerate(numeric_cols[:3]): # Limit to first 3 columns
for col2 in numeric_cols[i+1:4]: # Limit interactions
df_engineered[f'{col1}_x_{col2}'] = df[col1] * df[col2]
df_engineered[f'{col1}_div_{col2}'] = df[col1] / (df[col2] + 1e-6)
# Polynomial features
for col in numeric_cols[:3]: # Limit to first 3 columns
df_engineered[f'{col}_squared'] = df[col] ** 2
df_engineered[f'{col}_cubed'] = df[col] ** 3
df_engineered[f'{col}_log'] = np.log1p(np.abs(df[col]))
df_engineered[f'{col}_sqrt'] = np.sqrt(np.abs(df[col]))
# Statistical features
if len(numeric_cols) >= 3:
df_engineered['mean_of_numeric'] = df[numeric_cols].mean(axis=1)
df_engineered['std_of_numeric'] = df[numeric_cols].std(axis=1)
df_engineered['min_of_numeric'] = df[numeric_cols].min(axis=1)
df_engineered['max_of_numeric'] = df[numeric_cols].max(axis=1)
df_engineered['sum_of_numeric'] = df[numeric_cols].sum(axis=1)
# Date/time features (if date columns exist)
date_cols = df.select_dtypes(include=['datetime64']).columns
for col in date_cols:
df_engineered[f'{col}_year'] = df[col].dt.year
df_engineered[f'{col}_month'] = df[col].dt.month
df_engineered[f'{col}_day'] = df[col].dt.day
df_engineered[f'{col}_dayofweek'] = df[col].dt.dayofweek
df_engineered[f'{col}_quarter'] = df[col].dt.quarter
return df_engineered
# Usage example
# preprocessor = DataPreprocessor()
# analysis = preprocessor.analyze_data(df)
# outliers = preprocessor.detect_outliers(df, method='iqr')
# pipeline = preprocessor.build_preprocessing_pipeline(X)
# X_processed = preprocessor.transform_data(X, y)
# feature_importance = preprocessor.get_feature_importance(X, y)
2. Statistical Analysis & Hypothesis Testing
import scipy.stats as stats
import statsmodels.api as sm
from statsmodels.stats.power import TTestPower, GofChisquarePower
from statsmodels.stats.multicomp import pairwise_tukeyhsd
import pingouin as pg
import numpy as np
import pandas as pd
class StatisticalAnalyzer:
def __init__(self):
self.results = {}
def descriptive_statistics(self, data: pd.DataFrame,
columns: list = None) -> dict:
"""Comprehensive descriptive statistics"""
if columns is None:
columns = data.select_dtypes(include=[np.number]).columns.tolist()
stats_dict = {}
for col in columns:
series = data[col].dropna()
stats_dict[col] = {
'count': len(series),
'mean': series.mean(),
'median': series.median(),
'std': series.std(),
'var': series.var(),
'min': series.min(),
'max': series.max(),
'range': series.max() - series.min(),
'q25': series.quantile(0.25),
'q75': series.quantile(0.75),
'iqr': series.quantile(0.75) - series.quantile(0.25),
'skewness': stats.skew(series),
'kurtosis': stats.kurtosis(series),
'cv': series.std() / series.mean() if series.mean() != 0 else np.nan
}
# Confidence intervals
confidence_level = 0.95
degrees_freedom = len(series) - 1
sample_mean = series.mean()
sample_std = series.std()
standard_error = sample_std / np.sqrt(len(series))
t_critical = stats.t.ppf((1 + confidence_level) / 2, degrees_freedom)
margin_error = t_critical * standard_error
stats_dict[col]['confidence_interval'] = {
'lower': sample_mean - margin_error,
'upper': sample_mean + margin_error,
'level': confidence_level
}
return stats_dict
def normality_tests(self, data: pd.Series, alpha: float = 0.05) -> dict:
"""Test for normality using multiple methods"""
data_clean = data.dropna()
# Shapiro-Wilk test (for n < 5000)
shapiro_stat, shapiro_p = stats.shapiro(data_clean[:5000])
# Kolmogorov-Smirnov test
ks_stat, ks_p = stats.kstest(data_clean, 'norm',
args=(data_clean.mean(), data_clean.std()))
# Anderson-Darling test
ad_stat, ad_critical_values, ad_significance_levels = stats.anderson(data_clean, 'norm')
# D'Agostino's K2 test
dagostino_stat, dagostino_p = stats.normaltest(data_clean)
results = {
'shapiro_wilk': {
'statistic': shapiro_stat,
'p_value': shapiro_p,
'is_normal': shapiro_p > alpha,
'alpha': alpha
},
'kolmogorov_smirnov': {
'statistic': ks_stat,
'p_value': ks_p,
'is_normal': ks_p > alpha,
'alpha': alpha
},
'anderson_darling': {
'statistic': ad_stat,
'critical_values': dict(zip(ad_significance_levels, ad_critical_values)),
'is_normal': ad_stat < ad_critical_values[2] # 5% significance
},
'dagostino_k2': {
'statistic': dagostino_stat,
'p_value': dagostino_p,
'is_normal': dagostino_p > alpha,
'alpha': alpha
}
}
return results
def hypothesis_testing(self, data1: pd.Series, data2: pd.Series = None,
test_type: str = 'auto', alpha: float = 0.05,
alternative: str = 'two-sided') -> dict:
"""Comprehensive hypothesis testing"""
if data2 is None:
# One-sample tests
if test_type == 'auto':
# Determine appropriate test based on normality
normality = self.normality_tests(data1, alpha)
# Use t-test if data appears normal, otherwise Wilcoxon
if normality['shapiro_wilk']['is_normal']:
return self._one_sample_t_test(data1, alpha, alternative)
else:
return self._one_sample_wilcoxon_test(data1, alpha, alternative)
else:
# Two-sample tests
if test_type == 'auto':
# Check normality and equal variance assumptions
normality1 = self.normality_tests(data1, alpha)
normality2 = self.normality_tests(data2, alpha)
# Levene's test for equal variances
levene_stat, levene_p = stats.levene(data1.dropna(), data2.dropna())
if (normality1['shapiro_wilk']['is_normal'] and
normality2['shapiro_wilk']['is_normal'] and
levene_p > alpha):
# Use t-test if assumptions are met
return self._two_sample_t_test(data1, data2, alpha, alternative)
else:
# Use non-parametric test
return self._mann_whitney_test(data1, data2, alpha, alternative)
return {'error': 'Invalid test configuration'}
def _one_sample_t_test(self, data: pd.Series, alpha: float, alternative: str) -> dict:
"""One-sample t-test"""
t_stat, p_value = stats.ttest_1samp(data.dropna(), 0, alternative=alternative)
degrees_freedom = len(data.dropna()) - 1
confidence_level = 1 - alpha
# Effect size (Cohen's d)
cohens_d = data.mean() / data.std()
return {
'test': 'One-sample t-test',
'statistic': t_stat,
'p_value': p_value,
'degrees_freedom': degrees_freedom,
'alpha': alpha,
'is_significant': p_value < alpha,
'alternative': alternative,
'effect_size': cohens_d,
'effect_size_interpretation': self._interpret_cohens_d(cohens_d)
}
def _one_sample_wilcoxon_test(self, data: pd.Series, alpha: float, alternative: str) -> dict:
"""One-sample Wilcoxon signed-rank test"""
wilcoxon_stat, p_value = stats.wilcoxon(data.dropna(), alternative=alternative)
return {
'test': 'One-sample Wilcoxon signed-rank test',
'statistic': wilcoxon_stat,
'p_value': p_value,
'alpha': alpha,
'is_significant': p_value < alpha,
'alternative': alternative,
'note': 'Non-parametric test - does not assume normality'
}
def _two_sample_t_test(self, data1: pd.Series, data2: pd.Series,
alpha: float, alternative: str) -> dict:
"""Two-sample t-test"""
t_stat, p_value = stats.ttest_ind(data1.dropna(), data2.dropna(), alternative=alternative)
# Effect size (Cohen's d)
n1, n2 = len(data1.dropna()), len(data2.dropna())
pooled_std = np.sqrt(((n1-1)*data1.var() + (n2-1)*data2.var()) / (n1+n2-2))
cohens_d = (data1.mean() - data2.mean()) / pooled_std
return {
'test': 'Two-sample t-test',
'statistic': t_stat,
'p_value': p_value,
'alpha': alpha,
'is_significant': p_value < alpha,
'alternative': alternative,
'effect_size': cohens_d,
'effect_size_interpretation': self._interpret_cohens_d(cohens_d),
'sample_sizes': {'sample1': n1, 'sample2': n2}
}
def _mann_whitney_test(self, data1: pd.Series, data2: pd.Series,
alpha: float, alternative: str) -> dict:
"""Mann-Whitney U test"""
u_stat, p_value = stats.mannwhitneyu(data1.dropna(), data2.dropna(),
alternative=alternative)
return {
'test': 'Mann-Whitney U test',
'statistic': u_stat,
'p_value': p_value,
'alpha': alpha,
'is_significant': p_value < alpha,
'alternative': alternative,
'note': 'Non-parametric test - does not assume normality'
}
def _interpret_cohens_d(self, d: float) -> str:
"""Interpret Cohen's d effect size"""
abs_d = abs(d)
if abs_d < 0.2:
return 'negligible'
elif abs_d < 0.5:
return 'small'
elif abs_d < 0.8:
return 'medium'
else:
return 'large'
def correlation_analysis(self, data: pd.DataFrame,
method: str = 'pearson') -> dict:
"""Comprehensive correlation analysis"""
numeric_data = data.select_dtypes(include=[np.number])
if method == 'pearson':
corr_matrix, p_values = self._pearson_correlation(numeric_data)
elif method == 'spearman':
corr_matrix, p_values = self._spearman_correlation(numeric_data)
elif method == 'kendall':
corr_matrix, p_values = self._kendall_correlation(numeric_data)
# Find significant correlations
significant_correlations = []
for i, col1 in enumerate(numeric_data.columns):
for j, col2 in enumerate(numeric_data.columns):
if i < j: # Avoid duplicates
corr_val = corr_matrix.iloc[i, j]
p_val = p_values.iloc[i, j]
if abs(corr_val) > 0.3 and p_val < 0.05: # Thresholds
significant_correlations.append({
'variable1': col1,
'variable2': col2,
'correlation': corr_val,
'p_value': p_val,
'strength': self._interpret_correlation(corr_val)
})
return {
'correlation_matrix': corr_matrix.to_dict(),
'p_value_matrix': p_values.to_dict(),
'significant_correlations': significant_correlations,
'method': method
}
def _pearson_correlation(self, data: pd.DataFrame) -> tuple:
"""Pearson correlation with p-values"""
n = len(data)
corr_matrix = data.corr(method='pearson')
# Calculate p-values
p_values = pd.DataFrame(index=data.columns, columns=data.columns)
for i, col1 in enumerate(data.columns):
for j, col2 in enumerate(data.columns):
if i <= j:
corr, p_val = stats.pearsonr(data[col1].dropna(), data[col2].dropna())
p_values.iloc[i, j] = p_val
p_values.iloc[j, i] = p_val
else:
p_values.iloc[i, j] = p_values.iloc[j, i]
return corr_matrix, p_values
def _interpret_correlation(self, r: float) -> str:
"""Interpret correlation coefficient magnitude"""
abs_r = abs(r)
if abs_r < 0.1:
return 'negligible'
elif abs_r < 0.3:
return 'weak'
elif abs_r < 0.5:
return 'moderate'
elif abs_r < 0.7:
return 'strong'
else:
return 'very strong'
def power_analysis(self, effect_size: float, alpha: float = 0.05,
power: float = 0.8, sample_size: int = None,
test_type: str = 't-test') -> dict:
"""Statistical power analysis"""
if test_type == 't-test':
power_analysis = TTestPower()
if sample_size is None:
# Calculate required sample size
required_n = power_analysis.solve_power(
effect_size=effect_size,
alpha=alpha,
power=power,
alternative='two-sided'
)
return {
'test_type': test_type,
'required_sample_size': required_n,
'effect_size': effect_size,
'alpha': alpha,
'desired_power': power
}
else:
# Calculate achieved power
achieved_power = power_analysis.power(
effect_size=effect_size,
nobs=sample_size,
alpha=alpha,
alternative='two-sided'
)
return {
'test_type': test_type,
'sample_size': sample_size,
'achieved_power': achieved_power,
'effect_size': effect_size,
'alpha': alpha
}
return {'error': 'Unsupported test type for power analysis'}
# Usage example
# analyzer = StatisticalAnalyzer()
# desc_stats = analyzer.descriptive_statistics(df)
# normality = analyzer.normality_tests(df['column'])
# hypothesis_result = analyzer.hypothesis_testing(df['group1'], df['group2'])
# corr_analysis = analyzer.correlation_analysis(df)
# power_result = analyzer.power_analysis(effect_size=0.5, sample_size=100)
3. Machine Learning Model Development
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.preprocessing import StandardScaler
import xgboost as xgb
import lightgbm as lgb
import optuna
class MLModelBuilder:
def __init__(self):
self.models = {}
self.best_model = None
self.feature_importance = {}
self.evaluation_results = {}
def prepare_data(self, X: pd.DataFrame, y: pd.Series,
test_size: float = 0.2, random_state: int = 42) -> tuple:
"""Prepare data for machine learning"""
# Handle missing values
X_clean = X.fillna(X.median(numeric_only=True))
for col in X_clean.select_dtypes(include=['object', 'category']).columns:
X_clean[col] = X_clean[col].fillna(X_clean[col].mode()[0] if not X_clean[col].mode().empty else 'unknown')
# Encode categorical variables
X_encoded = pd.get_dummies(X_clean, drop_first=True)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X_encoded, y, test_size=test_size, random_state=random_state, stratify=y
)
return X_train, X_test, y_train, y_test
def build_base_models(self, X_train, X_test, y_train, y_test) -> dict:
"""Build and evaluate multiple base models"""
models = {
'logistic_regression': LogisticRegression(max_iter=1000, random_state=42),
'random_forest': RandomForestClassifier(n_estimators=100, random_state=42),
'gradient_boosting': GradientBoostingClassifier(random_state=42),
'xgboost': xgb.XGBClassifier(random_state=42, eval_metric='logloss'),
'lightgbm': lgb.LGBMClassifier(random_state=42, verbose=-1)
}
results = {}
for name, model in models.items():
print(f"Training {name}...")
# Train model
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_test)
y_pred_proba = None
if hasattr(model, 'predict_proba'):
y_pred_proba = model.predict_proba(X_test)[:, 1]
# Evaluate
accuracy = model.score(X_test, y_test)
if len(np.unique(y_test)) == 2 and y_pred_proba is not None:
roc_auc = roc_auc_score(y_test, y_pred_proba)
else:
roc_auc = None
# Cross-validation
cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')
results[name] = {
'model': model,
'accuracy': accuracy,
'roc_auc': roc_auc,
'cv_mean': cv_scores.mean(),
'cv_std': cv_scores.std(),
'predictions': y_pred,
'probabilities': y_pred_proba
}
# Feature importance
if hasattr(model, 'feature_importances_'):
importance_dict = dict(zip(X_train.columns, model.feature_importances_))
self.feature_importance[name] = importance_dict
self.models = results
return results
def hyperparameter_tuning(self, X_train, y_train, model_type: str = 'random_forest',
n_trials: int = 100) -> dict:
"""Hyperparameter tuning using Optuna"""
def objective(trial):
if model_type == 'random_forest':
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 20),
'min_samples_split': trial.suggest_int('min_samples_split', 2, 20),
'min_samples_leaf': trial.suggest_int('min_samples_leaf', 1, 10),
'max_features': trial.suggest_categorical('max_features', ['sqrt', 'log2', None])
}
model = RandomForestClassifier(**params, random_state=42, n_jobs=-1)
elif model_type == 'xgboost':
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'gamma': trial.suggest_float('gamma', 0, 5),
'reg_alpha': trial.suggest_float('reg_alpha', 0, 5),
'reg_lambda': trial.suggest_float('reg_lambda', 0, 5)
}
model = xgb.XGBClassifier(**params, random_state=42, eval_metric='logloss')
elif model_type == 'lightgbm':
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 15),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
'num_leaves': trial.suggest_int('num_leaves', 20, 150),
'feature_fraction': trial.suggest_float('feature_fraction', 0.6, 1.0),
'bagging_fraction': trial.suggest_float('bagging_fraction', 0.6, 1.0),
'bagging_freq': trial.suggest_int('bagging_freq', 1, 10),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100)
}
model = lgb.LGBMClassifier(**params, random_state=42, verbose=-1)
# Cross-validation
scores = cross_val_score(model, X_train, y_train, cv=5, scoring='accuracy')
return scores.mean()
# Create study
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=n_trials)
# Train best model
best_params = study.best_params
if model_type == 'random_forest':
best_model = RandomForestClassifier(**best_params, random_state=42, n_jobs=-1)
elif model_type == 'xgboost':
best_model = xgb.XGBClassifier(**best_params, random_state=42, eval_metric='logloss')
elif model_type == 'lightgbm':
best_model = lgb.LGBMClassifier(**best_params, random_state=42, verbose=-1)
best_model.fit(X_train, y_train)
return {
'best_model': best_model,
'best_params': best_params,
'best_score': study.best_value,
'study': study
}
def evaluate_model(self, model, X_test, y_test, model_name: str = "model") -> dict:
"""Comprehensive model evaluation"""
# Make predictions
y_pred = model.predict(X_test)
y_pred_proba = None
if hasattr(model, 'predict_proba'):
y_pred_proba = model.predict_proba(X_test)
if len(np.unique(y_test)) == 2:
y_pred_proba_binary = y_pred_proba[:, 1]
else:
y_pred_proba_binary = None
# Basic metrics
accuracy = model.score(X_test, y_test)
# Classification report
class_report = classification_report(y_test, y_pred, output_dict=True)
# Confusion matrix
conf_matrix = confusion_matrix(y_test, y_pred)
# ROC AUC for binary classification
roc_auc = None
if y_pred_proba_binary is not None:
roc_auc = roc_auc_score(y_test, y_pred_proba_binary)
# Feature importance
feature_importance = None
if hasattr(model, 'feature_importances_'):
feature_importance = dict(zip(X_test.columns, model.feature_importances_))
evaluation = {
'model_name': model_name,
'accuracy': accuracy,
'classification_report': class_report,
'confusion_matrix': conf_matrix.tolist(),
'roc_auc': roc_auc,
'feature_importance': feature_importance,
'predictions': y_pred.tolist(),
'probabilities': y_pred_proba.tolist() if y_pred_proba is not None else None
}
self.evaluation_results[model_name] = evaluation
return evaluation
def ensemble_models(self, X_train, X_test, y_train, y_test,
models_to_ensemble: list = None) -> dict:
"""Create ensemble models"""
if models_to_ensemble is None:
models_to_ensemble = ['random_forest', 'xgboost', 'lightgbm']
# Select models for ensemble
ensemble_models = []
for model_name in models_to_ensemble:
if model_name in self.models:
ensemble_models.append((model_name, self.models[model_name]['model']))
if not ensemble_models:
raise ValueError("No valid models found for ensemble")
# Voting ensemble
from sklearn.ensemble import VotingClassifier
voting_ensemble = VotingClassifier(
estimators=ensemble_models,
voting='soft' if len(np.unique(y_train)) == 2 else 'hard'
)
voting_ensemble.fit(X_train, y_train)
voting_evaluation = self.evaluate_model(voting_ensemble, X_test, y_test, "voting_ensemble")
# Stacking ensemble
from sklearn.ensemble import StackingClassifier
base_estimators = [(name, model['model']) for name, model in self.models.items()
if name in models_to_ensemble]
stacking_ensemble = StackingClassifier(
estimators=base_estimators,
final_estimator=LogisticRegression(max_iter=1000, random_state=42),
cv=5
)
stacking_ensemble.fit(X_train, y_train)
stacking_evaluation = self.evaluate_model(stacking_ensemble, X_test, y_test, "stacking_ensemble")
return {
'voting_ensemble': voting_evaluation,
'stacking_ensemble': stacking_evaluation,
'voting_model': voting_ensemble,
'stacking_model': stacking_ensemble
}
def model_explainability(self, model, X_test, feature_names: list = None) -> dict:
"""Model explainability using SHAP and LIME"""
try:
import shap
import lime
import lime.lime_tabular
if feature_names is None:
feature_names = X_test.columns.tolist()
explainability_results = {}
# SHAP explanations
if hasattr(model, 'predict_proba'):
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)
# For binary classification, use the positive class
if isinstance(shap_values, list) and len(shap_values) == 2:
shap_values_positive = shap_values[1]
else:
shap_values_positive = shap_values
explainability_results['shap'] = {
'values': shap_values_positive.tolist(),
'base_values': explainer.expected_value.tolist() if hasattr(explainer.expected_value, '__iter__') else explainer.expected_value,
'feature_names': feature_names
}
# Feature importance from SHAP
shap_importance = np.abs(shap_values_positive).mean(0)
shap_importance_dict = dict(zip(feature_names, shap_importance))
explainability_results['shap_importance'] = shap_importance_dict
# LIME explanations
lime_explainer = lime.lime_tabular.LimeTabularExplainer(
X_test.values,
feature_names=feature_names,
mode='classification',
discretize_continuous=True
)
# Explain a few instances
lime_explanations = []
for i in range(min(5, len(X_test))):
exp = lime_explainer.explain_instance(
X_test.iloc[i].values,
model.predict_proba,
num_features=10
)
lime_explanations.append({
'instance_index': i,
'explanation': exp.as_list(),
'score': exp.score
})
explainability_results['lime'] = lime_explanations
except ImportError:
explainability_results = {'error': 'SHAP and LIME packages not installed'}
return explainability_results
# Usage example
# ml_builder = MLModelBuilder()
# X_train, X_test, y_train, y_test = ml_builder.prepare_data(X, y)
# base_models = ml_builder.build_base_models(X_train, X_test, y_train, y_test)
# tuning_result = ml_builder.hyperparameter_tuning(X_train, y_train, 'xgboost')
# evaluation = ml_builder.evaluate_model(tuning_result['best_model'], X_test, y_test, 'tuned_xgboost')
# ensemble_results = ml_builder.ensemble_models(X_train, X_test, y_train, y_test)
# explainability = ml_builder.model_explainability(tuning_result['best_model'], X_test)
Level 3: Advanced Integration
Production ML Systems
4. ML Pipeline with Experiment Tracking
import mlflow
import mlflow.sklearn
import mlflow.xgboost
import joblib
import json
from datetime import datetime
import os
class ProductionMLPipeline:
def __init__(self, experiment_name: str, tracking_uri: str = None):
self.experiment_name = experiment_name
if tracking_uri:
mlflow.set_tracking_uri(tracking_uri)
mlflow.set_experiment(experiment_name)
self.experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
self.pipeline_stages = []
self.run_id = None
def run_complete_pipeline(self, data_path: str, target_column: str,
model_types: list = None, save_models: bool = True) -> dict:
"""Run complete ML pipeline with experiment tracking"""
with mlflow.start_run(run_name=f"pipeline_run_{datetime.now().strftime('%Y%m%d_%H%M%S')}") as run:
self.run_id = run.info.run_id
# Log dataset information
mlflow.log_param("data_path", data_path)
mlflow.log_param("target_column", target_column)
# Stage 1: Data Loading and Exploration
print("Stage 1: Loading and exploring data...")
data = self._load_data(data_path)
data_info = self._explore_data(data)
for key, value in data_info.items():
if isinstance(value, (int, float)):
mlflow.log_metric(f"data_{key}", value)
else:
mlflow.log_param(f"data_{key}", str(value))
# Stage 2: Data Preprocessing
print("Stage 2: Preprocessing data...")
preprocessor = DataPreprocessor()
processed_data = preprocessor.create_features(data)
X = processed_data.drop(target_column, axis=1)
y = processed_data[target_column]
X_train, X_test, y_train, y_test = preprocessor.prepare_data(X, y)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("test_size", len(X_test))
mlflow.log_param("feature_count", X_train.shape[1])
# Stage 3: Model Training and Evaluation
print("Stage 3: Training and evaluating models...")
if model_types is None:
model_types = ['random_forest', 'xgboost', 'lightgbm']
ml_builder = MLModelBuilder()
model_results = ml_builder.build_base_models(X_train, X_test, y_train, y_test)
best_model_name = None
best_score = 0
best_model = None
for model_name, result in model_results.items():
if model_name in model_types:
# Log model metrics
mlflow.log_metric(f"{model_name}_accuracy", result['accuracy'])
mlflow.log_metric(f"{model_name}_cv_mean", result['cv_mean'])
if result['roc_auc'] is not None:
mlflow.log_metric(f"{model_name}_roc_auc", result['roc_auc'])
# Log model
try:
if 'xgboost' in model_name:
mlflow.xgboost.log_model(result['model'], f"{model_name}_model")
else:
mlflow.sklearn.log_model(result['model'], f"{model_name}_model")
except:
pass
# Track best model
if result['accuracy'] > best_score:
best_score = result['accuracy']
best_model_name = model_name
best_model = result['model']
# Stage 4: Hyperparameter Tuning for Best Model
print("Stage 4: Hyperparameter tuning...")
if best_model_name:
tuning_result = ml_builder.hyperparameter_tuning(
X_train, y_train,
model_type=best_model_name.replace('_tuned', ''),
n_trials=50
)
# Log best parameters
for param, value in tuning_result['best_params'].items():
mlflow.log_param(f"best_{param}", value)
mlflow.log_metric("best_tuned_score", tuning_result['best_score'])
# Evaluate tuned model
tuned_evaluation = ml_builder.evaluate_model(
tuning_result['best_model'], X_test, y_test,
f"{best_model_name}_tuned"
)
mlflow.log_metric("tuned_model_accuracy", tuned_evaluation['accuracy'])
if tuned_evaluation['roc_auc'] is not None:
mlflow.log_metric("tuned_model_roc_auc", tuned_evaluation['roc_auc'])
# Update best model if tuning improved performance
if tuned_evaluation['accuracy'] > best_score:
best_model = tuning_result['best_model']
best_score = tuned_evaluation['accuracy']
# Log feature importance
if tuned_evaluation['feature_importance']:
importance_json = json.dumps(tuned_evaluation['feature_importance'])
mlflow.log_text(importance_json, "feature_importance.json")
# Stage 5: Ensemble Creation
print("Stage 5: Creating ensemble models...")
try:
ensemble_results = ml_builder.ensemble_models(X_train, X_test, y_train, y_test)
for ensemble_type, result in ensemble_results.items():
if 'evaluation' in ensemble_type:
mlflow.log_metric(f"{ensemble_type}_accuracy", result['accuracy'])
if result['roc_auc'] is not None:
mlflow.log_metric(f"{ensemble_type}_roc_auc", result['roc_auc'])
except:
print("Ensemble creation failed, continuing...")
# Stage 6: Model Explainability
print("Stage 6: Generating model explanations...")
if best_model is not None:
explainability = ml_builder.model_explainability(best_model, X_test)
if 'shap_importance' in explainability:
shap_importance_json = json.dumps(explainability['shap_importance'])
mlflow.log_text(shap_importance_json, "shap_importance.json")
# Save final model
if save_models and best_model is not None:
model_path = f"models/{best_model_name}_final"
os.makedirs(model_path, exist_ok=True)
joblib.dump(best_model, f"{model_path}/model.joblib")
mlflow.log_artifact(f"{model_path}/model.joblib", "final_model")
# Generate pipeline report
pipeline_results = {
'run_id': self.run_id,
'experiment_id': self.experiment_id,
'data_info': data_info,
'best_model': best_model_name,
'best_score': best_score,
'model_results': {name: {'accuracy': result['accuracy'], 'cv_mean': result['cv_mean']}
for name, result in model_results.items()},
'timestamp': datetime.now().isoformat()
}
# Log pipeline results
results_json = json.dumps(pipeline_results, indent=2)
mlflow.log_text(results_json, "pipeline_results.json")
return pipeline_results
def _load_data(self, data_path: str) -> pd.DataFrame:
"""Load data from various formats"""
if data_path.endswith('.csv'):
return pd.read_csv(data_path)
elif data_path.endswith('.parquet'):
return pd.read_parquet(data_path)
elif data_path.endswith('.json'):
return pd.read_json(data_path)
else:
raise ValueError(f"Unsupported file format: {data_path}")
def _explore_data(self, df: pd.DataFrame) -> dict:
"""Basic data exploration"""
return {
'shape': df.shape,
'missing_values': df.isnull().sum().sum(),
'duplicate_rows': df.duplicated().sum(),
'numeric_columns': len(df.select_dtypes(include=[np.number]).columns),
'categorical_columns': len(df.select_dtypes(include=['object', 'category']).columns),
'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024**2
}
def compare_experiments(self, metric: str = 'accuracy', top_k: int = 5) -> dict:
"""Compare multiple experiments"""
from mlflow.tracking import MlflowClient
client = MlflowClient()
runs = client.search_runs(
experiment_ids=[self.experiment_id],
order_by=[f"metrics.{metric} DESC"]
)
comparison_results = []
for run in runs[:top_k]:
run_data = {
'run_id': run.info.run_id,
'status': run.info.status,
'start_time': datetime.fromtimestamp(run.info.start_time / 1000),
'params': dict(run.data.params),
'metrics': dict(run.data.metrics)
}
comparison_results.append(run_data)
return {
'comparison_metric': metric,
'top_runs': comparison_results,
'total_runs': len(runs)
}
def deploy_model(self, run_id: str = None, model_name: str = "production_model") -> dict:
"""Prepare model for deployment"""
if run_id is None:
run_id = self.run_id
if run_id is None:
raise ValueError("No run ID provided and no current run found")
# Load model from MLflow
model_uri = f"runs:/{run_id}/final_model"
model = mlflow.sklearn.load_model(model_uri)
# Create deployment package
deployment_info = {
'model_uri': model_uri,
'run_id': run_id,
'model_name': model_name,
'deployment_timestamp': datetime.now().isoformat(),
'model_type': type(model).__name__
}
# Save deployment info
os.makedirs('deployment', exist_ok=True)
with open('deployment/deployment_info.json', 'w') as f:
json.dump(deployment_info, f, indent=2)
# Copy model to deployment directory
joblib.dump(model, 'deployment/model.joblib')
return deployment_info
# Usage example
# pipeline = ProductionMLPipeline("data_science_experiment")
# results = pipeline.run_complete_pipeline("data.csv", "target")
# comparison = pipeline.compare_experiments()
# deployment_info = pipeline.deploy_model()
Related Skills
- moai-domain-ml: Advanced machine learning techniques
- moai-domain-testing: Model testing and validation
- moai-document-processing: Data processing and extraction
- moai-essentials-refactor: Code optimization and performance
Quick Start Checklist
- Load and explore dataset structure
- Perform comprehensive data preprocessing
- Conduct statistical analysis and hypothesis testing
- Build baseline machine learning models
- Implement hyperparameter tuning
- Create ensemble models for better performance
- Generate model explanations and feature importance
- Set up experiment tracking with MLflow
Data Science Best Practices
- Data Understanding: Always explore and understand your data first
- Preprocessing: Handle missing values, outliers, and feature engineering
- Statistical Validation: Use appropriate statistical tests and validation
- Model Selection: Try multiple algorithms and compare performance
- Cross-Validation: Use proper cross-validation techniques
- Hyperparameter Tuning: Optimize model parameters systematically
- Interpretability: Use SHAP, LIME for model explanations
- Experiment Tracking: Log all experiments for reproducibility
Data Science & Analytics - Build end-to-end data science solutions with comprehensive statistical analysis, machine learning, and experiment tracking capabilities.
GitHub Repository
Related Skills
pytorch-fsdp
DesignThis Claude Skill provides expert guidance for PyTorch Fully Sharded Data Parallel (FSDP) training, helping developers implement distributed training solutions. It covers key features like parameter sharding, mixed precision, CPU offloading, and FSDP2 for large-scale model training. Use this skill when working with FSDP APIs, debugging distributed training code, or learning best practices for sharded data parallelism.
when-mapping-dependencies-use-dependency-mapper
OtherThis skill provides comprehensive dependency mapping, analysis, and visualization for software projects across multiple package managers. It extracts dependency trees, detects issues, audits for security vulnerabilities, and generates visual graphs. Use it when you need to understand, analyze, or visualize your project's dependency relationships and security posture.
moai-domain-ml
MetaThis Claude Skill provides enterprise-grade machine learning capabilities with expertise in TensorFlow, PyTorch, and Scikit-learn. It specializes in AutoML, neural architecture search, and MLOps automation for building production-ready ML pipelines. Use it for scalable ML deployment with comprehensive monitoring and experiment tracking.
moai-domain-ml
MetaThis Claude Skill provides enterprise machine learning capabilities with expertise in TensorFlow, PyTorch, and Scikit-learn. It specializes in AutoML, neural architecture search, and MLOps automation for building production-ready ML pipelines. Use it for scalable ML deployment with comprehensive monitoring and experiment tracking.
