analyze-diffusion-dynamics
Über
Diese Claude Skill analysiert Diffusionsprozesse unter Verwendung stochastischer Differentialgleichungen und Fokker-Planck-Gleichungen, um die Entwicklung von Wahrscheinlichkeitsdichten zu modellieren. Sie berechnet Verteilungen der Erstpassagezeiten und führt Parameterempfindlichkeitsanalysen für Drift- und Diffusionsparameter durch. Nutzen Sie sie, wenn Sie geschlossene Lösungen gegen Simulationen validieren oder kontinuierliche Diffusionsdynamiken analysieren müssen.
Schnellinstallation
Claude Code
Empfohlennpx skills add pjt222/agent-almanac -a claude-code/plugin add https://github.com/pjt222/agent-almanacgit clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/analyze-diffusion-dynamicsKopieren Sie diesen Befehl und fügen Sie ihn in Claude Code ein, um diese Fähigkeit zu installieren
Dokumentation
Analyze Diffusion Dynamics
Characterize diffusion process behavior → SDEs, Fokker-Planck, FPT distributions, param sensitivity, MC simulation valid.
Use When
- Derive prob density evolution → continuous-time diffusion
- Compute mean FPT or full FPT distributions → bounded diffusion
- Analyze drift/diffusion/boundary param effects
- Validate closed-form vs stochastic sim
- Build intuition for drift-diffusion or generative diffusion
In
- Required: SDE spec (drift fn, diffusion coeff, domain/boundaries)
- Required: Param values/ranges
- Required: Boundary conditions (absorbing, reflecting, mixed)
- Optional: Time horizon (default: auto-detect)
- Optional: Spatial discretization resolution (default: dx=0.001)
- Optional: MC trajectories (default: 10000)
Do
Step 1: Specify SDE Model
Define drift, diffusion coeff, boundaries.
- SDE in Ito form:
dX(t) = mu(X, t) dt + sigma(X, t) dW(t)
where mu = drift, sigma = diffusion coeff, W(t) = Wiener proc.
- Impl SDE:
import numpy as np
class DiffusionProcess:
"""A one-dimensional diffusion process specified by drift and diffusion functions."""
def __init__(self, drift_fn, diffusion_fn, lower_bound=None, upper_bound=None,
boundary_type="absorbing"):
self.drift = drift_fn
self.diffusion = diffusion_fn
self.lower_bound = lower_bound
self.upper_bound = upper_bound
self.boundary_type = boundary_type
# Example: Ornstein-Uhlenbeck process on [0, a]
ou_process = DiffusionProcess(
drift_fn=lambda x, t: 2.0 * (0.5 - x), # mean-reverting drift
diffusion_fn=lambda x, t: 0.1, # constant diffusion
lower_bound=0.0,
upper_bound=1.0,
boundary_type="absorbing"
)
# Example: Standard DDM (constant drift and diffusion)
ddm_process = DiffusionProcess(
drift_fn=lambda x, t: 0.5, # drift rate v
diffusion_fn=lambda x, t: 1.0, # unit diffusion (s=1, convention)
lower_bound=0.0, # lower absorbing boundary
upper_bound=1.5, # upper absorbing boundary (a)
boundary_type="absorbing"
)
- Define initial condition:
# Point source at x0
x0 = 0.75 # starting point (e.g., midpoint between boundaries for DDM with z=a/2)
# Or a distribution
initial_distribution = lambda x: np.exp(-50 * (x - 0.75)**2) # narrow Gaussian
- Validate param consistency:
def validate_process(process, x0):
"""Check that the SDE specification is self-consistent."""
assert process.lower_bound < process.upper_bound, "Lower bound must be less than upper bound"
assert process.lower_bound <= x0 <= process.upper_bound, \
f"Initial position {x0} outside bounds [{process.lower_bound}, {process.upper_bound}]"
test_drift = process.drift(x0, 0)
test_diff = process.diffusion(x0, 0)
assert np.isfinite(test_drift), f"Drift is not finite at x0={x0}"
assert test_diff > 0, f"Diffusion coefficient must be positive, got {test_diff}"
print(f"Process validated: drift={test_drift:.4f}, diffusion={test_diff:.4f} at x0={x0}")
validate_process(ddm_process, x0=0.75)
→ Fully spec'd SDE, finite drift, strictly pos diffusion, x0 in domain.
If err: Diffusion zero/neg anywhere → degenerate → check form. Drift infinite at boundary → reflecting may be better.
Step 2: Derive Fokker-Planck
SDE → PDE for prob density.
- FPE for transition density p(x, t):
dp/dt = -d/dx [mu(x,t) * p(x,t)] + (1/2) * d^2/dx^2 [sigma(x,t)^2 * p(x,t)]
- Constant coeffs (standard DDM) simplifies:
dp/dt = -v * dp/dx + (s^2 / 2) * d^2p/dx^2
- Numerical solution via finite diffs:
from scipy.sparse import diags
from scipy.sparse.linalg import spsolve
def solve_fokker_planck(process, x0, t_max, dx=0.001, dt=None):
"""Solve the FPE numerically using Crank-Nicolson scheme."""
x_grid = np.arange(process.lower_bound, process.upper_bound + dx, dx)
N = len(x_grid)
if dt is None:
max_sigma = max(process.diffusion(x, 0) for x in x_grid)
dt = 0.4 * dx**2 / max_sigma**2 # CFL-like stability condition
# Initial condition: narrow Gaussian centered at x0
p = np.exp(-((x_grid - x0)**2) / (2 * (2*dx)**2))
p[0] = 0 # absorbing boundary
p[-1] = 0 # absorbing boundary
p = p / (np.sum(p) * dx)
t_steps = int(t_max / dt)
survival = np.zeros(t_steps)
density_snapshots = []
for step in range(t_steps):
mu_vals = np.array([process.drift(x, step*dt) for x in x_grid])
sigma_vals = np.array([process.diffusion(x, step*dt) for x in x_grid])
D = 0.5 * sigma_vals**2
# Finite difference operators (interior points)
advection = -mu_vals[1:-1] / (2 * dx)
diffusion_coeff = D[1:-1] / dx**2
main_diag = 1 + dt * 2 * diffusion_coeff
upper_diag = dt * (-diffusion_coeff[:-1] - advection[:-1])
lower_diag = dt * (-diffusion_coeff[1:] + advection[1:])
A = diags([lower_diag, main_diag, upper_diag], [-1, 0, 1], format="csc")
p[1:-1] = spsolve(A, p[1:-1])
p[0] = 0
p[-1] = 0
survival[step] = np.sum(p[1:-1]) * dx
if step % (t_steps // 10) == 0:
density_snapshots.append((step * dt, p.copy()))
return x_grid, survival, density_snapshots
- Run + plot evolving density:
import matplotlib.pyplot as plt
x_grid, survival, snapshots = solve_fokker_planck(ddm_process, x0=0.75, t_max=5.0)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
for t_val, density in snapshots:
ax1.plot(x_grid, density, label=f"t={t_val:.2f}")
ax1.set_xlabel("x")
ax1.set_ylabel("p(x, t)")
ax1.set_title("Fokker-Planck Density Evolution")
ax1.legend()
t_vals = np.linspace(0, 5.0, len(survival))
ax2.plot(t_vals, survival)
ax2.set_xlabel("Time")
ax2.set_ylabel("Survival probability")
ax2.set_title("Survival Probability S(t)")
fig.tight_layout()
fig.savefig("fokker_planck_solution.png", dpi=150)
→ Density starts narrow peak at x0, spreads + drifts per SDE coeffs, decays as absorbed at boundaries. Survival monotonic 1 → 0.
If err: Oscillations/neg values → dt too large → reduce. Survival stays near 1 → boundaries too far or drift pushes away. Check solver boundary conditions.
Step 3: FPT Distributions
Derive distribution of times first reaching boundary.
- FPT density from survival:
def first_passage_time_density(survival, dt):
"""FPT density is the negative derivative of survival probability."""
fpt_density = -np.gradient(survival, dt)
fpt_density = np.maximum(fpt_density, 0) # enforce non-negativity
return fpt_density
- Standard DDM constant drift → known analytic:
def ddm_fpt_upper(t, v, a, z, s=1.0, n_terms=50):
"""Analytic FPT density at the upper boundary for constant-drift DDM.
Uses the infinite series representation (large-time expansion).
"""
if t <= 0:
return 0.0
density = 0.0
for k in range(1, n_terms + 1):
density += (k * np.pi * s**2 / a**2) * \
np.exp(-v * (a - z) / s**2 - 0.5 * v**2 * t / s**2) * \
np.sin(k * np.pi * z / a) * \
np.exp(-0.5 * (k * np.pi * s / a)**2 * t)
return density
- Summary stats of FPT:
def fpt_statistics(fpt_density, dt):
"""Compute mean, variance, and quantiles of the FPT distribution."""
t_vals = np.arange(len(fpt_density)) * dt
total_mass = np.sum(fpt_density) * dt
# Normalize
fpt_normed = fpt_density / total_mass if total_mass > 0 else fpt_density
mean_fpt = np.sum(t_vals * fpt_normed) * dt
var_fpt = np.sum((t_vals - mean_fpt)**2 * fpt_normed) * dt
# Quantiles via CDF
cdf = np.cumsum(fpt_normed) * dt
quantile_10 = t_vals[np.searchsorted(cdf, 0.1)]
quantile_50 = t_vals[np.searchsorted(cdf, 0.5)]
quantile_90 = t_vals[np.searchsorted(cdf, 0.9)]
return {
"mean": mean_fpt,
"std": np.sqrt(var_fpt),
"q10": quantile_10,
"q50": quantile_50,
"q90": quantile_90,
"total_probability": total_mass
}
- Two-boundary → separate FPT by boundary via prob flux at each absorbing wall (finite diff of density at boundary grid pts).
→ FPT density right-skewed unimodal. DDM pos drift → upper boundary FPT more mass + shorter mode than lower. Typical DDM (v=1, a=1.5, z=0.75) → mean FPT ~0.5-2.0s.
If err: Neg values → numerical diff noisy → apply small Gaussian smoothing. Total prob not ~1.0 → horizon too short (increase t_max) or prob leakage in solver.
Step 4: Param Sensitivity
Quantify param change effects on FPT.
- Define param grid:
param_ranges = {
"v": np.linspace(0.2, 3.0, 15), # drift rate
"a": np.linspace(0.5, 2.5, 15), # boundary separation
"z_ratio": np.linspace(0.3, 0.7, 9) # starting point as fraction of a
}
base_params = {"v": 1.0, "a": 1.5, "z_ratio": 0.5}
- Sweep each param, others at baseline:
sensitivity_results = {}
for param_name, param_values in param_ranges.items():
means = []
accuracies = []
for val in param_values:
params = base_params.copy()
params[param_name] = val
z = params["z_ratio"] * params["a"]
process = DiffusionProcess(
drift_fn=lambda x, t, v=params["v"]: v,
diffusion_fn=lambda x, t: 1.0,
lower_bound=0.0,
upper_bound=params["a"],
boundary_type="absorbing"
)
_, survival, _ = solve_fokker_planck(process, x0=z, t_max=10.0)
fpt = first_passage_time_density(survival, dt=10.0/len(survival))
stats = fpt_statistics(fpt, dt=10.0/len(survival))
means.append(stats["mean"])
accuracies.append(stats["total_probability"]) # proxy for upper boundary
sensitivity_results[param_name] = {
"values": param_values,
"mean_fpt": np.array(means),
"accuracy": np.array(accuracies)
}
- Plot sensitivity curves:
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
for idx, (param_name, result) in enumerate(sensitivity_results.items()):
ax = axes[idx]
ax.plot(result["values"], result["mean_fpt"], "b-o", label="Mean FPT")
ax.set_xlabel(param_name)
ax.set_ylabel("Mean FPT")
ax.set_title(f"Sensitivity to {param_name}")
ax2 = ax.twinx()
ax2.plot(result["values"], result["accuracy"], "r--s", label="P(upper)")
ax2.set_ylabel("P(upper boundary)")
ax.legend(loc="upper left")
ax2.legend(loc="upper right")
fig.tight_layout()
fig.savefig("parameter_sensitivity.png", dpi=150)
- Partial derivatives (local sensitivity at baseline):
for param_name, result in sensitivity_results.items():
idx_base = np.argmin(np.abs(result["values"] - base_params[param_name]))
if idx_base > 0 and idx_base < len(result["values"]) - 1:
d_mean = (result["mean_fpt"][idx_base+1] - result["mean_fpt"][idx_base-1]) / \
(result["values"][idx_base+1] - result["values"][idx_base-1])
print(f"d(mean_FPT)/d({param_name}) at baseline: {d_mean:.4f}")
→ Drift (v) strong neg effect mean FPT + strong pos accuracy. Boundary sep (a) strong pos mean FPT (speed-accuracy tradeoff). Start (z) shifts accuracy, smaller effect on mean FPT.
If err: Flat or non-monotonic → check range wide + solver horizon captures full FPT. Non-monotonic mean FPT vs drift → solver bug.
Step 5: Validate vs Sim
MC sim of SDE → confirm analytic + numerical PDE.
- Euler-Maruyama sim:
def simulate_sde(process, x0, dt_sim=0.0001, t_max=10.0, n_trajectories=10000):
"""Simulate SDE paths and record first-passage times."""
n_steps = int(t_max / dt_sim)
fpt_upper = np.full(n_trajectories, np.nan)
fpt_lower = np.full(n_trajectories, np.nan)
x = np.full(n_trajectories, x0)
sqrt_dt = np.sqrt(dt_sim)
for step in range(n_steps):
t = step * dt_sim
active = np.isnan(fpt_upper) & np.isnan(fpt_lower)
if not active.any():
break
mu = np.array([process.drift(xi, t) for xi in x[active]])
sigma = np.array([process.diffusion(xi, t) for xi in x[active]])
dW = np.random.randn(active.sum()) * sqrt_dt
x[active] += mu * dt_sim + sigma * dW
# Check boundary crossings
hit_upper = active & (x >= process.upper_bound)
hit_lower = active & (x <= process.lower_bound)
fpt_upper[hit_upper] = (step + 1) * dt_sim
fpt_lower[hit_lower] = (step + 1) * dt_sim
return fpt_upper, fpt_lower
- Run sim + compute empirical FPT:
fpt_upper_sim, fpt_lower_sim = simulate_sde(ddm_process, x0=0.75, n_trajectories=50000)
# Empirical statistics
valid_upper = fpt_upper_sim[~np.isnan(fpt_upper_sim)]
valid_lower = fpt_lower_sim[~np.isnan(fpt_lower_sim)]
total_absorbed = len(valid_upper) + len(valid_lower)
accuracy_sim = len(valid_upper) / total_absorbed
print(f"Simulated accuracy: {accuracy_sim:.4f}")
print(f"Mean FPT (upper): {valid_upper.mean():.4f} +/- {valid_upper.std()/np.sqrt(len(valid_upper)):.4f}")
print(f"Mean FPT (lower): {valid_lower.mean():.4f} +/- {valid_lower.std()/np.sqrt(len(valid_lower)):.4f}")
- Compare sim vs analytic or PDE:
fig, ax = plt.subplots(figsize=(10, 6))
# Empirical histogram
ax.hist(valid_upper, bins=100, density=True, alpha=0.5, label="Simulation (upper)")
ax.hist(valid_lower, bins=100, density=True, alpha=0.5, label="Simulation (lower)")
# Analytical solution overlay
t_vals_analytic = np.linspace(0.01, 5.0, 500)
v, a, z = 0.5, 1.5, 0.75
fpt_analytic = [ddm_fpt_upper(t, v, a, z) for t in t_vals_analytic]
ax.plot(t_vals_analytic, fpt_analytic, "k-", linewidth=2, label="Analytic (upper)")
ax.set_xlabel("First-passage time")
ax.set_ylabel("Density")
ax.set_title("FPT Distribution: Simulation vs. Analytic")
ax.legend()
fig.savefig("fpt_validation.png", dpi=150)
- Quantify agreement:
from scipy.stats import ks_2samp
# Kolmogorov-Smirnov test between simulated and analytically-derived samples
analytic_cdf = np.cumsum(fpt_analytic) * (t_vals_analytic[1] - t_vals_analytic[0])
sim_sorted = np.sort(valid_upper)
sim_cdf = np.arange(1, len(sim_sorted)+1) / len(sim_sorted)
# Interpolate analytic CDF at simulation quantiles
from scipy.interpolate import interp1d
analytic_interp = interp1d(t_vals_analytic, analytic_cdf, bounds_error=False, fill_value=(0, 1))
max_diff = np.max(np.abs(sim_cdf - analytic_interp(sim_sorted)))
print(f"Max CDF difference (simulation vs. analytic): {max_diff:.4f}")
assert max_diff < 0.05, f"Simulation and analytic FPT differ by {max_diff:.4f} (threshold: 0.05)"
→ Sim histograms closely match analytic FPT. KS max CDF diff <0.05 for 50K trajectories. Mean FPT sim within 2 SE of analytic.
If err: Disagree → check Euler-Maruyama step → dt_sim small enough (try dt_sim=0.00001) → boundary crossings not missed. Series no converge → increase n_terms. Non-constant coeffs no analytic → compare 2 numerical methods (PDE vs sim).
Check
- SDE spec passes consistency (finite drift, pos diffusion, x0 in domain)
- FPE density integrates → decreases monotonic (survival)
- FPE solution no artifacts (oscillations, neg)
- FPT density non-neg + integrates ~1.0 across boundaries
- Sensitivity monotonic expected (v vs accuracy, a vs mean FPT)
- MC mean FPT within 2 SE of PDE/analytic
- KS max CDF diff sim vs analytic <0.05
Traps
- Euler-Maruyama step too large: Large dt_sim → trajectories overshoot boundaries → biased FPT. Use dt_sim ≤1/10 expected mean FPT or boundary-corrected scheme.
- Truncate FPT series too early: Analytic DDM FPT uses infinite series. <20 terms → visible artifacts at short times. ≥50 + check convergence.
- Ignore numerical diffusion in PDE: 1st-order finite diff → artificial diffusion broadens FPT. Use Crank-Nicolson or higher-order.
- Confuse Ito + Stratonovich: FPE differs by SDE convention. Above assumes Ito. Stratonovich → add noise-induced drift correction.
- Not accounting both boundaries: Two-boundary → total absorption prob = 1.0. Only upper → incorrect stats.
→
fit-drift-diffusion-model— applies dynamics → estimate params from behavioral dataimplement-diffusion-network— generative diffusion models discretize same SDE frameworkwrite-testthat-tests— testing numerical solvers + analytic implscreate-technical-report— document diffusion analysis results
GitHub Repository
Verwandte Skills
llamaguard
AndereLlamaGuard ist Metas 7-8B-Parameter-Modell zur Moderation von LLM-Eingaben und -Ausgaben in sechs Sicherheitskategorien wie Gewalt und Hassrede. Es bietet eine Genauigkeit von 94-95 % und kann mit vLLM, Hugging Face oder Amazon SageMaker eingesetzt werden. Nutzen Sie diese Skill, um Inhaltsfilterung und Sicherheitsguardrails einfach in Ihre KI-Anwendungen zu integrieren.
cost-optimization
AndereDiese Claude Skill unterstützt Entwickler bei der Optimierung von Cloud-Kosten durch Ressourcen-Dimensionierung, Tagging-Strategien und Ausgabenanalysen. Sie bietet einen Rahmen zur Senkung von Cloud-Ausgaben und zur Implementierung von Kosten-Governance für AWS, Azure und GCP. Nutzen Sie sie, wenn Sie Infrastrukturkosten analysieren, Ressourcen richtig dimensionieren oder Budgetvorgaben einhalten müssen.
quantizing-models-bitsandbytes
AndereDiese Fähigkeit quantisiert LLMs auf 8-Bit- oder 4-Bit-Präzision mittels bitsandbytes und erreicht dabei eine Speicherreduzierung von 50–75 % bei minimalem Genauigkeitsverlust. Sie ist ideal für den Betrieb größerer Modelle mit begrenztem GPU-Speicher oder zur Beschleunigung von Inferenzvorgängen und unterstützt Formate wie INT8, NF4 und FP4. Die Fähigkeit integriert sich in HuggingFace Transformers und ermöglicht QLoRA-Training sowie 8-Bit-Optimierer.
dispatching-parallel-agents
AndereDiese Claude-Fähigkeit verteilt mehrere Agenten, um drei oder mehr unabhängige Probleme gleichzeitig zu untersuchen und zu beheben. Sie ist für Szenarien konzipiert, die unabhängige Fehler umfassen, die ohne gemeinsamen Zustand oder Abhängigkeiten gelöst werden können. Die Kernfähigkeit ist die parallele Problemlösung, bei der pro unabhängigem Problembereich ein Agent zugewiesen wird, um die Effizienz zu maximieren.
