analyze-diffusion-dynamics
について
このClaude Skillは、確率密度の時間発展をモデル化するために、確率微分方程式とフォッカー・プランク方程式を用いて拡散過程を解析します。初到達時間分布を計算し、ドリフトおよび拡散パラメータに対する感度分析を実行します。シミュレーション結果と閉形式解を検証する必要がある場合や、連続時間拡散ダイナミクスを分析する際にご利用ください。
クイックインストール
Claude Code
推奨npx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/analyze-diffusion-dynamicsこのコマンドをClaude Codeにコピー&ペーストしてスキルをインストールします
ドキュメント
Analyze Diffusion Dynamics
Characterize diffusion process behavior → SDEs, Fokker-Planck, FPT distributions, param sensitivity, MC simulation valid.
Use When
- Derive prob density evolution → continuous-time diffusion
- Compute mean FPT or full FPT distributions → bounded diffusion
- Analyze drift/diffusion/boundary param effects
- Validate closed-form vs stochastic sim
- Build intuition for drift-diffusion or generative diffusion
In
- Required: SDE spec (drift fn, diffusion coeff, domain/boundaries)
- Required: Param values/ranges
- Required: Boundary conditions (absorbing, reflecting, mixed)
- Optional: Time horizon (default: auto-detect)
- Optional: Spatial discretization resolution (default: dx=0.001)
- Optional: MC trajectories (default: 10000)
Do
Step 1: Specify SDE Model
Define drift, diffusion coeff, boundaries.
- SDE in Ito form:
dX(t) = mu(X, t) dt + sigma(X, t) dW(t)
where mu = drift, sigma = diffusion coeff, W(t) = Wiener proc.
- Impl SDE:
import numpy as np
class DiffusionProcess:
"""A one-dimensional diffusion process specified by drift and diffusion functions."""
def __init__(self, drift_fn, diffusion_fn, lower_bound=None, upper_bound=None,
boundary_type="absorbing"):
self.drift = drift_fn
self.diffusion = diffusion_fn
self.lower_bound = lower_bound
self.upper_bound = upper_bound
self.boundary_type = boundary_type
# Example: Ornstein-Uhlenbeck process on [0, a]
ou_process = DiffusionProcess(
drift_fn=lambda x, t: 2.0 * (0.5 - x), # mean-reverting drift
diffusion_fn=lambda x, t: 0.1, # constant diffusion
lower_bound=0.0,
upper_bound=1.0,
boundary_type="absorbing"
)
# Example: Standard DDM (constant drift and diffusion)
ddm_process = DiffusionProcess(
drift_fn=lambda x, t: 0.5, # drift rate v
diffusion_fn=lambda x, t: 1.0, # unit diffusion (s=1, convention)
lower_bound=0.0, # lower absorbing boundary
upper_bound=1.5, # upper absorbing boundary (a)
boundary_type="absorbing"
)
- Define initial condition:
# Point source at x0
x0 = 0.75 # starting point (e.g., midpoint between boundaries for DDM with z=a/2)
# Or a distribution
initial_distribution = lambda x: np.exp(-50 * (x - 0.75)**2) # narrow Gaussian
- Validate param consistency:
def validate_process(process, x0):
"""Check that the SDE specification is self-consistent."""
assert process.lower_bound < process.upper_bound, "Lower bound must be less than upper bound"
assert process.lower_bound <= x0 <= process.upper_bound, \
f"Initial position {x0} outside bounds [{process.lower_bound}, {process.upper_bound}]"
test_drift = process.drift(x0, 0)
test_diff = process.diffusion(x0, 0)
assert np.isfinite(test_drift), f"Drift is not finite at x0={x0}"
assert test_diff > 0, f"Diffusion coefficient must be positive, got {test_diff}"
print(f"Process validated: drift={test_drift:.4f}, diffusion={test_diff:.4f} at x0={x0}")
validate_process(ddm_process, x0=0.75)
→ Fully spec'd SDE, finite drift, strictly pos diffusion, x0 in domain.
If err: Diffusion zero/neg anywhere → degenerate → check form. Drift infinite at boundary → reflecting may be better.
Step 2: Derive Fokker-Planck
SDE → PDE for prob density.
- FPE for transition density p(x, t):
dp/dt = -d/dx [mu(x,t) * p(x,t)] + (1/2) * d^2/dx^2 [sigma(x,t)^2 * p(x,t)]
- Constant coeffs (standard DDM) simplifies:
dp/dt = -v * dp/dx + (s^2 / 2) * d^2p/dx^2
- Numerical solution via finite diffs:
from scipy.sparse import diags
from scipy.sparse.linalg import spsolve
def solve_fokker_planck(process, x0, t_max, dx=0.001, dt=None):
"""Solve the FPE numerically using Crank-Nicolson scheme."""
x_grid = np.arange(process.lower_bound, process.upper_bound + dx, dx)
N = len(x_grid)
if dt is None:
max_sigma = max(process.diffusion(x, 0) for x in x_grid)
dt = 0.4 * dx**2 / max_sigma**2 # CFL-like stability condition
# Initial condition: narrow Gaussian centered at x0
p = np.exp(-((x_grid - x0)**2) / (2 * (2*dx)**2))
p[0] = 0 # absorbing boundary
p[-1] = 0 # absorbing boundary
p = p / (np.sum(p) * dx)
t_steps = int(t_max / dt)
survival = np.zeros(t_steps)
density_snapshots = []
for step in range(t_steps):
mu_vals = np.array([process.drift(x, step*dt) for x in x_grid])
sigma_vals = np.array([process.diffusion(x, step*dt) for x in x_grid])
D = 0.5 * sigma_vals**2
# Finite difference operators (interior points)
advection = -mu_vals[1:-1] / (2 * dx)
diffusion_coeff = D[1:-1] / dx**2
main_diag = 1 + dt * 2 * diffusion_coeff
upper_diag = dt * (-diffusion_coeff[:-1] - advection[:-1])
lower_diag = dt * (-diffusion_coeff[1:] + advection[1:])
A = diags([lower_diag, main_diag, upper_diag], [-1, 0, 1], format="csc")
p[1:-1] = spsolve(A, p[1:-1])
p[0] = 0
p[-1] = 0
survival[step] = np.sum(p[1:-1]) * dx
if step % (t_steps // 10) == 0:
density_snapshots.append((step * dt, p.copy()))
return x_grid, survival, density_snapshots
- Run + plot evolving density:
import matplotlib.pyplot as plt
x_grid, survival, snapshots = solve_fokker_planck(ddm_process, x0=0.75, t_max=5.0)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
for t_val, density in snapshots:
ax1.plot(x_grid, density, label=f"t={t_val:.2f}")
ax1.set_xlabel("x")
ax1.set_ylabel("p(x, t)")
ax1.set_title("Fokker-Planck Density Evolution")
ax1.legend()
t_vals = np.linspace(0, 5.0, len(survival))
ax2.plot(t_vals, survival)
ax2.set_xlabel("Time")
ax2.set_ylabel("Survival probability")
ax2.set_title("Survival Probability S(t)")
fig.tight_layout()
fig.savefig("fokker_planck_solution.png", dpi=150)
→ Density starts narrow peak at x0, spreads + drifts per SDE coeffs, decays as absorbed at boundaries. Survival monotonic 1 → 0.
If err: Oscillations/neg values → dt too large → reduce. Survival stays near 1 → boundaries too far or drift pushes away. Check solver boundary conditions.
Step 3: FPT Distributions
Derive distribution of times first reaching boundary.
- FPT density from survival:
def first_passage_time_density(survival, dt):
"""FPT density is the negative derivative of survival probability."""
fpt_density = -np.gradient(survival, dt)
fpt_density = np.maximum(fpt_density, 0) # enforce non-negativity
return fpt_density
- Standard DDM constant drift → known analytic:
def ddm_fpt_upper(t, v, a, z, s=1.0, n_terms=50):
"""Analytic FPT density at the upper boundary for constant-drift DDM.
Uses the infinite series representation (large-time expansion).
"""
if t <= 0:
return 0.0
density = 0.0
for k in range(1, n_terms + 1):
density += (k * np.pi * s**2 / a**2) * \
np.exp(-v * (a - z) / s**2 - 0.5 * v**2 * t / s**2) * \
np.sin(k * np.pi * z / a) * \
np.exp(-0.5 * (k * np.pi * s / a)**2 * t)
return density
- Summary stats of FPT:
def fpt_statistics(fpt_density, dt):
"""Compute mean, variance, and quantiles of the FPT distribution."""
t_vals = np.arange(len(fpt_density)) * dt
total_mass = np.sum(fpt_density) * dt
# Normalize
fpt_normed = fpt_density / total_mass if total_mass > 0 else fpt_density
mean_fpt = np.sum(t_vals * fpt_normed) * dt
var_fpt = np.sum((t_vals - mean_fpt)**2 * fpt_normed) * dt
# Quantiles via CDF
cdf = np.cumsum(fpt_normed) * dt
quantile_10 = t_vals[np.searchsorted(cdf, 0.1)]
quantile_50 = t_vals[np.searchsorted(cdf, 0.5)]
quantile_90 = t_vals[np.searchsorted(cdf, 0.9)]
return {
"mean": mean_fpt,
"std": np.sqrt(var_fpt),
"q10": quantile_10,
"q50": quantile_50,
"q90": quantile_90,
"total_probability": total_mass
}
- Two-boundary → separate FPT by boundary via prob flux at each absorbing wall (finite diff of density at boundary grid pts).
→ FPT density right-skewed unimodal. DDM pos drift → upper boundary FPT more mass + shorter mode than lower. Typical DDM (v=1, a=1.5, z=0.75) → mean FPT ~0.5-2.0s.
If err: Neg values → numerical diff noisy → apply small Gaussian smoothing. Total prob not ~1.0 → horizon too short (increase t_max) or prob leakage in solver.
Step 4: Param Sensitivity
Quantify param change effects on FPT.
- Define param grid:
param_ranges = {
"v": np.linspace(0.2, 3.0, 15), # drift rate
"a": np.linspace(0.5, 2.5, 15), # boundary separation
"z_ratio": np.linspace(0.3, 0.7, 9) # starting point as fraction of a
}
base_params = {"v": 1.0, "a": 1.5, "z_ratio": 0.5}
- Sweep each param, others at baseline:
sensitivity_results = {}
for param_name, param_values in param_ranges.items():
means = []
accuracies = []
for val in param_values:
params = base_params.copy()
params[param_name] = val
z = params["z_ratio"] * params["a"]
process = DiffusionProcess(
drift_fn=lambda x, t, v=params["v"]: v,
diffusion_fn=lambda x, t: 1.0,
lower_bound=0.0,
upper_bound=params["a"],
boundary_type="absorbing"
)
_, survival, _ = solve_fokker_planck(process, x0=z, t_max=10.0)
fpt = first_passage_time_density(survival, dt=10.0/len(survival))
stats = fpt_statistics(fpt, dt=10.0/len(survival))
means.append(stats["mean"])
accuracies.append(stats["total_probability"]) # proxy for upper boundary
sensitivity_results[param_name] = {
"values": param_values,
"mean_fpt": np.array(means),
"accuracy": np.array(accuracies)
}
- Plot sensitivity curves:
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
for idx, (param_name, result) in enumerate(sensitivity_results.items()):
ax = axes[idx]
ax.plot(result["values"], result["mean_fpt"], "b-o", label="Mean FPT")
ax.set_xlabel(param_name)
ax.set_ylabel("Mean FPT")
ax.set_title(f"Sensitivity to {param_name}")
ax2 = ax.twinx()
ax2.plot(result["values"], result["accuracy"], "r--s", label="P(upper)")
ax2.set_ylabel("P(upper boundary)")
ax.legend(loc="upper left")
ax2.legend(loc="upper right")
fig.tight_layout()
fig.savefig("parameter_sensitivity.png", dpi=150)
- Partial derivatives (local sensitivity at baseline):
for param_name, result in sensitivity_results.items():
idx_base = np.argmin(np.abs(result["values"] - base_params[param_name]))
if idx_base > 0 and idx_base < len(result["values"]) - 1:
d_mean = (result["mean_fpt"][idx_base+1] - result["mean_fpt"][idx_base-1]) / \
(result["values"][idx_base+1] - result["values"][idx_base-1])
print(f"d(mean_FPT)/d({param_name}) at baseline: {d_mean:.4f}")
→ Drift (v) strong neg effect mean FPT + strong pos accuracy. Boundary sep (a) strong pos mean FPT (speed-accuracy tradeoff). Start (z) shifts accuracy, smaller effect on mean FPT.
If err: Flat or non-monotonic → check range wide + solver horizon captures full FPT. Non-monotonic mean FPT vs drift → solver bug.
Step 5: Validate vs Sim
MC sim of SDE → confirm analytic + numerical PDE.
- Euler-Maruyama sim:
def simulate_sde(process, x0, dt_sim=0.0001, t_max=10.0, n_trajectories=10000):
"""Simulate SDE paths and record first-passage times."""
n_steps = int(t_max / dt_sim)
fpt_upper = np.full(n_trajectories, np.nan)
fpt_lower = np.full(n_trajectories, np.nan)
x = np.full(n_trajectories, x0)
sqrt_dt = np.sqrt(dt_sim)
for step in range(n_steps):
t = step * dt_sim
active = np.isnan(fpt_upper) & np.isnan(fpt_lower)
if not active.any():
break
mu = np.array([process.drift(xi, t) for xi in x[active]])
sigma = np.array([process.diffusion(xi, t) for xi in x[active]])
dW = np.random.randn(active.sum()) * sqrt_dt
x[active] += mu * dt_sim + sigma * dW
# Check boundary crossings
hit_upper = active & (x >= process.upper_bound)
hit_lower = active & (x <= process.lower_bound)
fpt_upper[hit_upper] = (step + 1) * dt_sim
fpt_lower[hit_lower] = (step + 1) * dt_sim
return fpt_upper, fpt_lower
- Run sim + compute empirical FPT:
fpt_upper_sim, fpt_lower_sim = simulate_sde(ddm_process, x0=0.75, n_trajectories=50000)
# Empirical statistics
valid_upper = fpt_upper_sim[~np.isnan(fpt_upper_sim)]
valid_lower = fpt_lower_sim[~np.isnan(fpt_lower_sim)]
total_absorbed = len(valid_upper) + len(valid_lower)
accuracy_sim = len(valid_upper) / total_absorbed
print(f"Simulated accuracy: {accuracy_sim:.4f}")
print(f"Mean FPT (upper): {valid_upper.mean():.4f} +/- {valid_upper.std()/np.sqrt(len(valid_upper)):.4f}")
print(f"Mean FPT (lower): {valid_lower.mean():.4f} +/- {valid_lower.std()/np.sqrt(len(valid_lower)):.4f}")
- Compare sim vs analytic or PDE:
fig, ax = plt.subplots(figsize=(10, 6))
# Empirical histogram
ax.hist(valid_upper, bins=100, density=True, alpha=0.5, label="Simulation (upper)")
ax.hist(valid_lower, bins=100, density=True, alpha=0.5, label="Simulation (lower)")
# Analytical solution overlay
t_vals_analytic = np.linspace(0.01, 5.0, 500)
v, a, z = 0.5, 1.5, 0.75
fpt_analytic = [ddm_fpt_upper(t, v, a, z) for t in t_vals_analytic]
ax.plot(t_vals_analytic, fpt_analytic, "k-", linewidth=2, label="Analytic (upper)")
ax.set_xlabel("First-passage time")
ax.set_ylabel("Density")
ax.set_title("FPT Distribution: Simulation vs. Analytic")
ax.legend()
fig.savefig("fpt_validation.png", dpi=150)
- Quantify agreement:
from scipy.stats import ks_2samp
# Kolmogorov-Smirnov test between simulated and analytically-derived samples
analytic_cdf = np.cumsum(fpt_analytic) * (t_vals_analytic[1] - t_vals_analytic[0])
sim_sorted = np.sort(valid_upper)
sim_cdf = np.arange(1, len(sim_sorted)+1) / len(sim_sorted)
# Interpolate analytic CDF at simulation quantiles
from scipy.interpolate import interp1d
analytic_interp = interp1d(t_vals_analytic, analytic_cdf, bounds_error=False, fill_value=(0, 1))
max_diff = np.max(np.abs(sim_cdf - analytic_interp(sim_sorted)))
print(f"Max CDF difference (simulation vs. analytic): {max_diff:.4f}")
assert max_diff < 0.05, f"Simulation and analytic FPT differ by {max_diff:.4f} (threshold: 0.05)"
→ Sim histograms closely match analytic FPT. KS max CDF diff <0.05 for 50K trajectories. Mean FPT sim within 2 SE of analytic.
If err: Disagree → check Euler-Maruyama step → dt_sim small enough (try dt_sim=0.00001) → boundary crossings not missed. Series no converge → increase n_terms. Non-constant coeffs no analytic → compare 2 numerical methods (PDE vs sim).
Check
- SDE spec passes consistency (finite drift, pos diffusion, x0 in domain)
- FPE density integrates → decreases monotonic (survival)
- FPE solution no artifacts (oscillations, neg)
- FPT density non-neg + integrates ~1.0 across boundaries
- Sensitivity monotonic expected (v vs accuracy, a vs mean FPT)
- MC mean FPT within 2 SE of PDE/analytic
- KS max CDF diff sim vs analytic <0.05
Traps
- Euler-Maruyama step too large: Large dt_sim → trajectories overshoot boundaries → biased FPT. Use dt_sim ≤1/10 expected mean FPT or boundary-corrected scheme.
- Truncate FPT series too early: Analytic DDM FPT uses infinite series. <20 terms → visible artifacts at short times. ≥50 + check convergence.
- Ignore numerical diffusion in PDE: 1st-order finite diff → artificial diffusion broadens FPT. Use Crank-Nicolson or higher-order.
- Confuse Ito + Stratonovich: FPE differs by SDE convention. Above assumes Ito. Stratonovich → add noise-induced drift correction.
- Not accounting both boundaries: Two-boundary → total absorption prob = 1.0. Only upper → incorrect stats.
→
fit-drift-diffusion-model— applies dynamics → estimate params from behavioral dataimplement-diffusion-network— generative diffusion models discretize same SDE frameworkwrite-testthat-tests— testing numerical solvers + analytic implscreate-technical-report— document diffusion analysis results
GitHub リポジトリ
関連スキル
llamaguard
その他LlamaGuardは、暴力やヘイトスピーチなど6つの安全性カテゴリーにおいて、LLMの入力と出力をモデレートするMetaの70-80億パラメータモデルです。94〜95%の精度を提供し、vLLM、Hugging Face、Amazon SageMakerを使用してデプロイ可能です。このスキルを使用して、AIアプリケーションにコンテンツフィルタリングと安全策を簡単に統合できます。
cost-optimization
その他このClaudeスキルは、リソースの適正サイジング、タグ付け戦略、支出分析を通じて、開発者がクラウドコストを最適化することを支援します。AWS、Azure、GCPにわたるクラウド支出の削減とコストガバナンスの実施のためのフレームワークを提供します。インフラコストの分析、リソースの適正サイジング、または予算制約への対応が必要な際にご利用ください。
quantizing-models-bitsandbytes
その他このスキルは、bitsandbytesを使用してLLMを8ビットまたは4ビット精度に量子化し、精度の低下を最小限に抑えつつ50〜75%のメモリ削減を実現します。限られたGPUメモリでより大規模なモデルを実行したり、推論を高速化するのに理想的で、INT8、NF4、FP4などのフォーマットをサポートしています。HuggingFace Transformersと統合され、QLoRAトレーニングや8ビットオプティマイザーを可能にします。
dispatching-parallel-agents
その他このClaudeスキルは、複数のエージェントを配備し、3つ以上の独立した問題を並行して調査・修正します。共有状態や依存関係がなく解決可能な、無関係な障害が発生するシナリオ向けに設計されています。中核となる機能は並列問題解決であり、効率を最大化するために独立した問題領域ごとに1つのエージェントを割り当てます。
