MCP HubMCP Hub
Retour aux compétences

analyze-diffusion-dynamics

pjt222
Mis à jour 6 days ago
21 vues
17
2
17
Voir sur GitHub
Autreai

À propos

Cette compétence analyse les processus de diffusion en utilisant des équations différentielles stochastiques et des équations de Fokker-Planck pour modéliser l'évolution de la densité de probabilité et calculer les temps de premier passage. Elle aide les développeurs à valider des solutions analytiques par rapport à des simulations et à réaliser des analyses de sensibilité des paramètres de dérive et de diffusion. Utilisez-la lorsque vous travaillez avec des processus de diffusion en temps continu, des problèmes de diffusion bornée, ou pour la validation de dynamiques stochastiques.

Installation rapide

Claude Code

Recommandé
Principal
npx skills add pjt222/agent-almanac -a claude-code
Commande PluginAlternatif
/plugin add https://github.com/pjt222/agent-almanac
Git CloneAlternatif
git clone https://github.com/pjt222/agent-almanac.git ~/.claude/skills/analyze-diffusion-dynamics

Copiez et collez cette commande dans Claude Code pour installer cette compétence

Documentation

析擴散動

定 SDE、推 Fokker-Planck、解首達時、析參敏,以模擬驗。

  • 推連時擴散之概密演→用
  • 算有界擴散之均首達時或全分→用
  • 析漂、擴散係、界參影響→用
  • 以隨擬驗閉式解→用
  • 建漂擴散或生擴散之動覺→用

  • :SDE 規(漂函、擴散係、域/界)
  • :漂擴散函之參值或範
  • :界條(吸、反、混)
  • :暫態析時域(默自察動)
  • :數 PDE 解之空離(默 dx=0.001)
  • :模擬軌數(默 10000)

一:定 SDE 模

定漂、擴散係、界條。

  1. SDE 標 Ito 式:
dX(t) = mu(X, t) dt + sigma(X, t) dW(t)

mu 漂、sigma 擴散係、W(t) 標 Wiener。

  1. 碼:
import numpy as np

class DiffusionProcess:
    """A one-dimensional diffusion process specified by drift and diffusion functions."""

    def __init__(self, drift_fn, diffusion_fn, lower_bound=None, upper_bound=None,
                 boundary_type="absorbing"):
        self.drift = drift_fn
        self.diffusion = diffusion_fn
        self.lower_bound = lower_bound
        self.upper_bound = upper_bound
        self.boundary_type = boundary_type

# Example: Ornstein-Uhlenbeck process on [0, a]
ou_process = DiffusionProcess(
    drift_fn=lambda x, t: 2.0 * (0.5 - x),     # mean-reverting drift
    diffusion_fn=lambda x, t: 0.1,               # constant diffusion
    lower_bound=0.0,
    upper_bound=1.0,
    boundary_type="absorbing"
)

# Example: Standard DDM (constant drift and diffusion)
ddm_process = DiffusionProcess(
    drift_fn=lambda x, t: 0.5,        # drift rate v
    diffusion_fn=lambda x, t: 1.0,    # unit diffusion (s=1, convention)
    lower_bound=0.0,                   # lower absorbing boundary
    upper_bound=1.5,                   # upper absorbing boundary (a)
    boundary_type="absorbing"
)
  1. 定初態:
# Point source at x0
x0 = 0.75  # starting point (e.g., midpoint between boundaries for DDM with z=a/2)

# Or a distribution
initial_distribution = lambda x: np.exp(-50 * (x - 0.75)**2)  # narrow Gaussian
  1. 驗參一致:
def validate_process(process, x0):
    """Check that the SDE specification is self-consistent."""
    assert process.lower_bound < process.upper_bound, "Lower bound must be less than upper bound"
    assert process.lower_bound <= x0 <= process.upper_bound, \
        f"Initial position {x0} outside bounds [{process.lower_bound}, {process.upper_bound}]"
    test_drift = process.drift(x0, 0)
    test_diff = process.diffusion(x0, 0)
    assert np.isfinite(test_drift), f"Drift is not finite at x0={x0}"
    assert test_diff > 0, f"Diffusion coefficient must be positive, got {test_diff}"
    print(f"Process validated: drift={test_drift:.4f}, diffusion={test_diff:.4f} at x0={x0}")

validate_process(ddm_process, x0=0.75)

得:全規 SDE,漂有限、擴散正、初於域內。

敗:擴散域中零或負→過退,察函形。漂於界無窮→考反界宜否。

二:推 Fokker-Planck

SDE 轉概密之偏微分方。

  1. FPE 為 p(x, t):
dp/dt = -d/dx [mu(x,t) * p(x,t)] + (1/2) * d^2/dx^2 [sigma(x,t)^2 * p(x,t)]
  1. 常係(標 DDM)簡為:
dp/dt = -v * dp/dx + (s^2 / 2) * d^2p/dx^2
  1. 數解 FPE 以有限差:
from scipy.sparse import diags
from scipy.sparse.linalg import spsolve

def solve_fokker_planck(process, x0, t_max, dx=0.001, dt=None):
    """Solve the FPE numerically using Crank-Nicolson scheme."""
    x_grid = np.arange(process.lower_bound, process.upper_bound + dx, dx)
    N = len(x_grid)

    if dt is None:
        max_sigma = max(process.diffusion(x, 0) for x in x_grid)
        dt = 0.4 * dx**2 / max_sigma**2  # CFL-like stability condition

    # Initial condition: narrow Gaussian centered at x0
    p = np.exp(-((x_grid - x0)**2) / (2 * (2*dx)**2))
    p[0] = 0  # absorbing boundary
    p[-1] = 0  # absorbing boundary
    p = p / (np.sum(p) * dx)

    t_steps = int(t_max / dt)
    survival = np.zeros(t_steps)
    density_snapshots = []

    for step in range(t_steps):
        mu_vals = np.array([process.drift(x, step*dt) for x in x_grid])
        sigma_vals = np.array([process.diffusion(x, step*dt) for x in x_grid])
        D = 0.5 * sigma_vals**2

        # Finite difference operators (interior points)
        advection = -mu_vals[1:-1] / (2 * dx)
        diffusion_coeff = D[1:-1] / dx**2

        main_diag = 1 + dt * 2 * diffusion_coeff
        upper_diag = dt * (-diffusion_coeff[:-1] - advection[:-1])
        lower_diag = dt * (-diffusion_coeff[1:] + advection[1:])

        A = diags([lower_diag, main_diag, upper_diag], [-1, 0, 1], format="csc")
        p[1:-1] = spsolve(A, p[1:-1])
        p[0] = 0
        p[-1] = 0

        survival[step] = np.sum(p[1:-1]) * dx

        if step % (t_steps // 10) == 0:
            density_snapshots.append((step * dt, p.copy()))

    return x_grid, survival, density_snapshots
  1. 行而繪密演:
import matplotlib.pyplot as plt

x_grid, survival, snapshots = solve_fokker_planck(ddm_process, x0=0.75, t_max=5.0)

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
for t_val, density in snapshots:
    ax1.plot(x_grid, density, label=f"t={t_val:.2f}")
ax1.set_xlabel("x")
ax1.set_ylabel("p(x, t)")
ax1.set_title("Fokker-Planck Density Evolution")
ax1.legend()

t_vals = np.linspace(0, 5.0, len(survival))
ax2.plot(t_vals, survival)
ax2.set_xlabel("Time")
ax2.set_ylabel("Survival probability")
ax2.set_title("Survival Probability S(t)")
fig.tight_layout()
fig.savefig("fokker_planck_solution.png", dpi=150)

得:密始為窄峰於 x0,按 SDE 散漂、漸減於界吸。存概單調由 1 至 0 減。

敗:密生振或負→時步太大,減 dt。密不減(存近 1)→界或太遠或漂離兩界。察解中界條。

三:算首達時分

推首達界之時分。

  1. 由存函算 FPT 密:
def first_passage_time_density(survival, dt):
    """FPT density is the negative derivative of survival probability."""
    fpt_density = -np.gradient(survival, dt)
    fpt_density = np.maximum(fpt_density, 0)  # enforce non-negativity
    return fpt_density
  1. 標 DDM 常漂用知析解:
def ddm_fpt_upper(t, v, a, z, s=1.0, n_terms=50):
    """Analytic FPT density at the upper boundary for constant-drift DDM.

    Uses the infinite series representation (large-time expansion).
    """
    if t <= 0:
        return 0.0
    density = 0.0
    for k in range(1, n_terms + 1):
        density += (k * np.pi * s**2 / a**2) * \
            np.exp(-v * (a - z) / s**2 - 0.5 * v**2 * t / s**2) * \
            np.sin(k * np.pi * z / a) * \
            np.exp(-0.5 * (k * np.pi * s / a)**2 * t)
    return density
  1. 算 FPT 分要計:
def fpt_statistics(fpt_density, dt):
    """Compute mean, variance, and quantiles of the FPT distribution."""
    t_vals = np.arange(len(fpt_density)) * dt
    total_mass = np.sum(fpt_density) * dt

    # Normalize
    fpt_normed = fpt_density / total_mass if total_mass > 0 else fpt_density

    mean_fpt = np.sum(t_vals * fpt_normed) * dt
    var_fpt = np.sum((t_vals - mean_fpt)**2 * fpt_normed) * dt

    # Quantiles via CDF
    cdf = np.cumsum(fpt_normed) * dt
    quantile_10 = t_vals[np.searchsorted(cdf, 0.1)]
    quantile_50 = t_vals[np.searchsorted(cdf, 0.5)]
    quantile_90 = t_vals[np.searchsorted(cdf, 0.9)]

    return {
        "mean": mean_fpt,
        "std": np.sqrt(var_fpt),
        "q10": quantile_10,
        "q50": quantile_50,
        "q90": quantile_90,
        "total_probability": total_mass
    }
  1. 雙界題以各界吸壁之概通分 FPT(界格點密之有限差)。

得:FPT 密為右偏單峰。DDM 正漂上界 FPT 質多模短於下界。典 DDM 參(v=1、a=1.5、z=0.75)均 FPT 約 0.5-2.0 秒。

敗:FPT 密有負→數微噪,施小高斯平。兩界全概不近 1.0→時域太短(增 t_max)或解中概漏。

四:析參敏

量參變影 FPT 分。

  1. 定參格:
param_ranges = {
    "v": np.linspace(0.2, 3.0, 15),     # drift rate
    "a": np.linspace(0.5, 2.5, 15),      # boundary separation
    "z_ratio": np.linspace(0.3, 0.7, 9)  # starting point as fraction of a
}

base_params = {"v": 1.0, "a": 1.5, "z_ratio": 0.5}
  1. 各參掃,他持基:
sensitivity_results = {}

for param_name, param_values in param_ranges.items():
    means = []
    accuracies = []
    for val in param_values:
        params = base_params.copy()
        params[param_name] = val
        z = params["z_ratio"] * params["a"]

        process = DiffusionProcess(
            drift_fn=lambda x, t, v=params["v"]: v,
            diffusion_fn=lambda x, t: 1.0,
            lower_bound=0.0,
            upper_bound=params["a"],
            boundary_type="absorbing"
        )

        _, survival, _ = solve_fokker_planck(process, x0=z, t_max=10.0)
        fpt = first_passage_time_density(survival, dt=10.0/len(survival))
        stats = fpt_statistics(fpt, dt=10.0/len(survival))
        means.append(stats["mean"])
        accuracies.append(stats["total_probability"])  # proxy for upper boundary

    sensitivity_results[param_name] = {
        "values": param_values,
        "mean_fpt": np.array(means),
        "accuracy": np.array(accuracies)
    }
  1. 繪敏曲:
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
for idx, (param_name, result) in enumerate(sensitivity_results.items()):
    ax = axes[idx]
    ax.plot(result["values"], result["mean_fpt"], "b-o", label="Mean FPT")
    ax.set_xlabel(param_name)
    ax.set_ylabel("Mean FPT")
    ax.set_title(f"Sensitivity to {param_name}")

    ax2 = ax.twinx()
    ax2.plot(result["values"], result["accuracy"], "r--s", label="P(upper)")
    ax2.set_ylabel("P(upper boundary)")
    ax.legend(loc="upper left")
    ax2.legend(loc="upper right")

fig.tight_layout()
fig.savefig("parameter_sensitivity.png", dpi=150)
  1. 算偏微(基處局敏):
for param_name, result in sensitivity_results.items():
    idx_base = np.argmin(np.abs(result["values"] - base_params[param_name]))
    if idx_base > 0 and idx_base < len(result["values"]) - 1:
        d_mean = (result["mean_fpt"][idx_base+1] - result["mean_fpt"][idx_base-1]) / \
                 (result["values"][idx_base+1] - result["values"][idx_base-1])
        print(f"d(mean_FPT)/d({param_name}) at baseline: {d_mean:.4f}")

得:v 對均 FPT 強負、對精強正。a 對均 FPT 強正(速精權衡)。z 移精影均 FPT 較小。

敗:敏曲平或非單調→察參範足且解時域捕全 FPT 分。漂率對均 FPT 非單調示解蟲。

五:以模擬驗析

行 SDE 蒙地卡羅以驗析與數 PDE。

  1. Euler-Maruyama 模擬 SDE:
def simulate_sde(process, x0, dt_sim=0.0001, t_max=10.0, n_trajectories=10000):
    """Simulate SDE paths and record first-passage times."""
    n_steps = int(t_max / dt_sim)
    fpt_upper = np.full(n_trajectories, np.nan)
    fpt_lower = np.full(n_trajectories, np.nan)

    x = np.full(n_trajectories, x0)
    sqrt_dt = np.sqrt(dt_sim)

    for step in range(n_steps):
        t = step * dt_sim
        active = np.isnan(fpt_upper) & np.isnan(fpt_lower)
        if not active.any():
            break

        mu = np.array([process.drift(xi, t) for xi in x[active]])
        sigma = np.array([process.diffusion(xi, t) for xi in x[active]])
        dW = np.random.randn(active.sum()) * sqrt_dt

        x[active] += mu * dt_sim + sigma * dW

        # Check boundary crossings
        hit_upper = active & (x >= process.upper_bound)
        hit_lower = active & (x <= process.lower_bound)
        fpt_upper[hit_upper] = (step + 1) * dt_sim
        fpt_lower[hit_lower] = (step + 1) * dt_sim

    return fpt_upper, fpt_lower
  1. 行模擬而算經驗 FPT 分:
fpt_upper_sim, fpt_lower_sim = simulate_sde(ddm_process, x0=0.75, n_trajectories=50000)

# Empirical statistics
valid_upper = fpt_upper_sim[~np.isnan(fpt_upper_sim)]
valid_lower = fpt_lower_sim[~np.isnan(fpt_lower_sim)]
total_absorbed = len(valid_upper) + len(valid_lower)
accuracy_sim = len(valid_upper) / total_absorbed

print(f"Simulated accuracy: {accuracy_sim:.4f}")
print(f"Mean FPT (upper): {valid_upper.mean():.4f} +/- {valid_upper.std()/np.sqrt(len(valid_upper)):.4f}")
print(f"Mean FPT (lower): {valid_lower.mean():.4f} +/- {valid_lower.std()/np.sqrt(len(valid_lower)):.4f}")
  1. 比模擬於析或數 PDE:
fig, ax = plt.subplots(figsize=(10, 6))

# Empirical histogram
ax.hist(valid_upper, bins=100, density=True, alpha=0.5, label="Simulation (upper)")
ax.hist(valid_lower, bins=100, density=True, alpha=0.5, label="Simulation (lower)")

# Analytical solution overlay
t_vals_analytic = np.linspace(0.01, 5.0, 500)
v, a, z = 0.5, 1.5, 0.75
fpt_analytic = [ddm_fpt_upper(t, v, a, z) for t in t_vals_analytic]
ax.plot(t_vals_analytic, fpt_analytic, "k-", linewidth=2, label="Analytic (upper)")

ax.set_xlabel("First-passage time")
ax.set_ylabel("Density")
ax.set_title("FPT Distribution: Simulation vs. Analytic")
ax.legend()
fig.savefig("fpt_validation.png", dpi=150)
  1. 量法間合:
from scipy.stats import ks_2samp

# Kolmogorov-Smirnov test between simulated and analytically-derived samples
analytic_cdf = np.cumsum(fpt_analytic) * (t_vals_analytic[1] - t_vals_analytic[0])
sim_sorted = np.sort(valid_upper)
sim_cdf = np.arange(1, len(sim_sorted)+1) / len(sim_sorted)

# Interpolate analytic CDF at simulation quantiles
from scipy.interpolate import interp1d
analytic_interp = interp1d(t_vals_analytic, analytic_cdf, bounds_error=False, fill_value=(0, 1))
max_diff = np.max(np.abs(sim_cdf - analytic_interp(sim_sorted)))
print(f"Max CDF difference (simulation vs. analytic): {max_diff:.4f}")
assert max_diff < 0.05, f"Simulation and analytic FPT differ by {max_diff:.4f} (threshold: 0.05)"

得:模擬直方密合析 FPT 曲。50000 軌 KS 測最大 CDF 差 <0.05。模擬均 FPT 於析值 2 標誤內。

敗:模擬異於析→先察 Euler-Maruyama 步——dt_sim 須小至界越不漏(試 dt_sim=0.00001)。析級不收→增 n_terms。非常係無析解時,比兩數法(PDE 解對模擬)相互。

  • SDE 規通一致察(漂有限、擴散正、x0 於域)
  • Fokker-Planck 密積為時單調減(存函)
  • Fokker-Planck 解無數偽(振、負)
  • FPT 密非負且兩界積近 1.0
  • 敏析示預期單調關(v 對精、a 對均 FPT)
  • 蒙地卡羅均 FPT 於 PDE/析解之 2 標誤內
  • KS 測模擬於析最大 CDF 差 <0.05

  • Euler-Maruyama 步太大:大 dt_sim 致軌過界,致 FPT 估偏。dt_sim 至多預期均 FPT 之 1/10,或用界正解
  • FPT 級截太早:析 DDM FPT 密用無窮級。太少(<20)致見偽,尤短時。用至少 50 項驗收
  • 忽 PDE 中數擴散:一階有限差致人為擴散展 FPT 分。為精用 Crank-Nicolson 或更高階
  • 混 Ito 與 Stratonovich:Fokker-Planck 隨 SDE 例異。上標式設 Ito 微積。Stratonovich 須加噪致漂正
  • 不計兩界:雙界題全吸概須和為 1.0。唯報上界 FPT 而忽下界致誤計

  • fit-drift-diffusion-model — 施此動於行為數估參
  • implement-diffusion-network — 生擴散模離同 SDE 框
  • write-testthat-tests — 測數解與析施
  • create-technical-report — 文擴散析果

Dépôt GitHub

pjt222/agent-almanac
Chemin: i18n/wenyan-ultra/skills/analyze-diffusion-dynamics
0
agentsagentskillsai-assisted-developmentclaude-codeskillsteams

Compétences associées

llamaguard

Autre

LlamaGuard est le modèle de Meta, doté de 7 à 8 milliards de paramètres, conçu pour modérer les entrées et sorties des LLM selon six catégories de sécurité comme la violence et les discours haineux. Il offre une précision de 94 à 95 % et peut être déployé avec vLLM, Hugging Face ou Amazon SageMaker. Utilisez cette compétence pour intégrer facilement le filtrage de contenu et des garde-fous de sécurité dans vos applications d'IA.

Voir la compétence

cost-optimization

Autre

Cette compétence de Claude aide les développeurs à optimiser les coûts du cloud grâce au redimensionnement des ressources, aux stratégies d'étiquetage et à l'analyse des dépenses. Elle fournit un cadre pour réduire les dépenses cloud et mettre en œuvre une gouvernance des coûts sur AWS, Azure et GCP. Utilisez-la lorsque vous devez analyser les coûts d'infrastructure, redimensionner les ressources ou respecter des contraintes budgétaires.

Voir la compétence

quantizing-models-bitsandbytes

Autre

Cette compétence quantifie les LLMs en précision 8 bits ou 4 bits à l'aide de bitsandbytes, permettant une réduction de 50 à 75 % de la mémoire utilisée avec une perte de précision minime. Elle est idéale pour exécuter des modèles plus volumineux sur une mémoire GPU limitée ou pour accélérer l'inférence, prenant en charge des formats comme INT8, NF4 et FP4. La compétence s'intègre à HuggingFace Transformers et permet l'entraînement QLoRA ainsi que l'utilisation d'optimiseurs en 8 bits.

Voir la compétence

dispatching-parallel-agents

Autre

Cette compétence Claude déploie plusieurs agents pour enquêter et résoudre simultanément 3 problèmes indépendants ou plus. Elle est conçue pour des scénarios impliquant des défaillances non liées qui peuvent être résolues sans état partagé ni dépendances. La capacité fondamentale est la résolution de problèmes en parallèle, en assignant un agent par domaine problématique indépendant afin de maximiser l'efficacité.

Voir la compétence