elixir-otp-concurrency
About
This skill provides guidance for building concurrent, fault-tolerant systems in Elixir using OTP. Use it when implementing GenServer, Supervisor, or other OTP behaviors, designing supervision trees, or managing processes and application state. It helps with creating robust, concurrent architectures and troubleshooting process-related issues.
Documentation
Elixir OTP and Concurrency
This skill activates when working with OTP behaviors, building concurrent systems, managing processes, or implementing fault-tolerant architectures in Elixir.
When to Use This Skill
Activate when:
- Implementing GenServer, GenStage, Supervisor, or other OTP behaviors
- Designing supervision trees and fault-tolerance strategies
- Working with Tasks, Agents, or process management
- Building concurrent or distributed systems
- Managing application state
- Troubleshooting process-related issues
OTP Behaviors
GenServer - Generic Server
Use GenServer for stateful processes:
defmodule MyApp.Counter do
use GenServer
# Client API
def start_link(initial_value) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
def increment do
GenServer.call(__MODULE__, :increment)
end
def get_value do
GenServer.call(__MODULE__, :get)
end
# Server Callbacks
@impl true
def init(initial_value) do
{:ok, initial_value}
end
@impl true
def handle_call(:increment, _from, state) do
{:reply, state + 1, state + 1}
end
@impl true
def handle_call(:get, _from, state) do
{:reply, state, state}
end
end
GenServer Best Practices
- Use
callfor synchronous requests that need a response - Use
castfor asynchronous fire-and-forget messages - Use
handle_infofor receiving regular messages - Keep server callbacks fast - delegate heavy work to Tasks
- Name processes with
viatuples or Registry for dynamic naming - Implement timeouts to prevent client processes from hanging
GenServer Patterns
Background Work:
def init(state) do
schedule_work()
{:ok, state}
end
def handle_info(:work, state) do
do_work(state)
schedule_work()
{:noreply, state}
end
defp schedule_work do
Process.send_after(self(), :work, 5000)
end
State Timeouts:
def handle_call(:get, _from, state) do
{:reply, state, state, {:state_timeout, 30_000, :cleanup}}
end
def handle_state_timeout(:cleanup, state) do
{:stop, :normal, state}
end
Supervisor - Process Supervision
Build supervision trees for fault tolerance:
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Database connection pool
{MyApp.Repo, []},
# PubSub system
{Phoenix.PubSub, name: MyApp.PubSub},
# Custom supervisor
{MyApp.WorkerSupervisor, []},
# Individual workers
{MyApp.Cache, []},
{MyApp.RateLimiter, []},
# Web endpoint
MyAppWeb.Endpoint
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Supervision Strategies
:one_for_one - If a child dies, only that child is restarted
Supervisor.start_link(children, strategy: :one_for_one)
:one_for_all - If any child dies, all children are terminated and restarted
Supervisor.start_link(children, strategy: :one_for_all)
:rest_for_one - If a child dies, it and all children started after it are restarted
Supervisor.start_link(children, strategy: :rest_for_one)
Dynamic Supervisors
For dynamically creating processes:
defmodule MyApp.WorkerSupervisor do
use DynamicSupervisor
def start_link(init_arg) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
def start_worker(args) do
spec = {MyApp.Worker, args}
DynamicSupervisor.start_child(__MODULE__, spec)
end
@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
Restart Strategies
Configure child restart behavior:
children = [
# Always restart (default)
{MyApp.CriticalWorker, restart: :permanent},
# Never restart
{MyApp.OneTimeTask, restart: :temporary},
# Only restart on abnormal exit
{MyApp.OptionalWorker, restart: :transient}
]
Task - Concurrent Work
Fire-and-forget Tasks
For concurrent work without needing results:
Task.start(fn ->
send_email(user, "Welcome!")
end)
Awaited Tasks
For concurrent work with results:
task = Task.async(fn ->
expensive_computation()
end)
# Do other work...
result = Task.await(task, 5000) # 5 second timeout
Supervised Tasks
For long-running tasks under supervision:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
# Use the supervised task
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
long_running_operation()
end)
Concurrent Map
Process collections concurrently:
# Sequential
results = Enum.map(urls, &fetch_url/1)
# Concurrent
results = Task.async_stream(urls, &fetch_url/1, max_concurrency: 10)
|> Enum.to_list()
Agent - Simple State Management
Use Agent for simple state:
{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)
# Get state
value = Agent.get(MyApp.Cache, fn state -> Map.get(state, :key) end)
# Update state
Agent.update(MyApp.Cache, fn state -> Map.put(state, :key, value) end)
# Get and update atomically
Agent.get_and_update(MyApp.Cache, fn state ->
{Map.get(state, :key), Map.delete(state, :key)}
end)
When to use Agent vs GenServer:
- Use Agent for simple key-value state
- Use GenServer when you need complex logic, callbacks, or process lifecycle management
Process Communication
send/receive
Basic message passing:
# Send message
send(pid, {:hello, "world"})
# Receive message
receive do
{:hello, msg} -> IO.puts(msg)
after
5000 -> IO.puts("Timeout")
end
Process Registration
Register processes by name:
# Local registration
Process.register(self(), :my_process)
send(:my_process, :hello)
# Via Registry
{:ok, _} = Registry.start_link(keys: :unique, name: MyApp.Registry)
{:ok, pid} = GenServer.start_link(MyWorker, nil,
name: {:via, Registry, {MyApp.Registry, "worker_1"}}
)
# Look up process
[{pid, _}] = Registry.lookup(MyApp.Registry, "worker_1")
Process Links and Monitors
Links - Bidirectional, propagate exits:
# Link processes
Process.link(pid)
# Spawn linked
spawn_link(fn -> do_work() end)
Monitors - Unidirectional, receive DOWN messages:
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
IO.puts("Process died: #{inspect(reason)}")
end
Concurrency Patterns
Pipeline Pattern
Chain operations with concurrency:
defmodule Pipeline do
def process(data) do
data
|> async(&step1/1)
|> async(&step2/1)
|> async(&step3/1)
|> await_all()
end
defp async(input, fun) do
Task.async(fn -> fun.(input) end)
end
defp await_all(tasks) when is_list(tasks) do
Enum.map(tasks, &Task.await/1)
end
end
Worker Pool
Implement a worker pool:
defmodule MyApp.WorkerPool do
use GenServer
def start_link(opts) do
pool_size = Keyword.get(opts, :size, 10)
GenServer.start_link(__MODULE__, pool_size, name: __MODULE__)
end
def execute(fun) do
GenServer.call(__MODULE__, {:execute, fun})
end
@impl true
def init(pool_size) do
workers = for _ <- 1..pool_size do
{:ok, pid} = Task.Supervisor.start_link()
pid
end
{:ok, %{workers: workers, index: 0}}
end
@impl true
def handle_call({:execute, fun}, _from, state) do
worker = Enum.at(state.workers, state.index)
task = Task.Supervisor.async_nolink(worker, fun)
new_index = rem(state.index + 1, length(state.workers))
{:reply, task, %{state | index: new_index}}
end
end
Backpressure with GenStage
For producer-consumer pipelines:
defmodule Producer do
use GenStage
def start_link(initial) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(initial) do
{:producer, initial}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand - 1)
{:noreply, events, state + demand}
end
end
defmodule Consumer do
use GenStage
def start_link() do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:consumer, :ok}
end
def handle_events(events, _from, state) do
Enum.each(events, &process_event/1)
{:noreply, [], state}
end
end
ETS - Erlang Term Storage
In-memory key-value storage:
# Create table
:ets.new(:my_table, [:named_table, :public, read_concurrency: true])
# Insert
:ets.insert(:my_table, {:key, "value"})
# Lookup
[{:key, value}] = :ets.lookup(:my_table, :key)
# Delete
:ets.delete(:my_table, :key)
# Match patterns
:ets.match(:my_table, {:"$1", "value"})
# Iterate
:ets.foldl(fn {k, v}, acc -> [{k, v} | acc] end, [], :my_table)
ETS Best Practices
- Use
read_concurrency: truefor read-heavy workloads - Use
write_concurrency: truefor write-heavy workloads - Prefer
:set(default) for unique keys - Use
:bagor:duplicate_bagfor multiple values per key - Always own ETS tables in a GenServer or Supervisor to prevent data loss
Error Handling and Fault Tolerance
Let It Crash Philosophy
Design for failure:
# Don't do defensive programming
def process_order(order_id) do
# Let it crash if order doesn't exist
order = Repo.get!(Order, order_id)
# Let it crash if validation fails
{:ok, processed} = process(order)
processed
end
Proper Error Handling
When to handle errors vs let crash:
# Handle expected errors
def fetch_user(id) do
case HTTPoison.get("#{@api_url}/users/#{id}") do
{:ok, %{status_code: 200, body: body}} ->
Jason.decode(body)
{:ok, %{status_code: 404}} ->
{:error, :not_found}
{:ok, %{status_code: status}} ->
{:error, {:unexpected_status, status}}
{:error, reason} ->
{:error, {:network_error, reason}}
end
end
# Let unexpected errors crash
def update_user!(id, params) do
user = Repo.get!(User, id) # Crash if not found
user
|> User.changeset(params)
|> Repo.update!() # Crash if invalid
end
Circuit Breaker
Prevent cascading failures:
defmodule CircuitBreaker do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, %{status: :closed, failures: 0}, name: __MODULE__)
end
def call(fun) do
case GenServer.call(__MODULE__, :status) do
:open -> {:error, :circuit_open}
:closed -> execute(fun)
end
end
defp execute(fun) do
try do
result = fun.()
GenServer.cast(__MODULE__, :success)
{:ok, result}
rescue
e ->
GenServer.cast(__MODULE__, :failure)
{:error, e}
end
end
@impl true
def init(state), do: {:ok, state}
@impl true
def handle_call(:status, _from, state) do
{:reply, state.status, state}
end
@impl true
def handle_cast(:success, state) do
{:noreply, %{state | failures: 0, status: :closed}}
end
@impl true
def handle_cast(:failure, state) do
new_failures = state.failures + 1
if new_failures >= 5 do
Process.send_after(self(), :half_open, 30_000)
{:noreply, %{state | failures: new_failures, status: :open}}
else
{:noreply, %{state | failures: new_failures}}
end
end
@impl true
def handle_info(:half_open, state) do
{:noreply, %{state | status: :closed, failures: 0}}
end
end
Testing Concurrent Systems
Testing GenServers
defmodule MyApp.CounterTest do
use ExUnit.Case, async: true
test "increments counter" do
{:ok, pid} = MyApp.Counter.start_link(0)
assert MyApp.Counter.increment(pid) == 1
assert MyApp.Counter.increment(pid) == 2
assert MyApp.Counter.get_value(pid) == 2
end
end
Testing Asynchronous Processes
test "process receives message" do
parent = self()
spawn(fn ->
receive do
:ping -> send(parent, :pong)
end
end)
send(pid, :ping)
assert_receive :pong, 1000
end
Testing Supervision
test "supervisor restarts crashed worker" do
{:ok, sup} = Supervisor.start_link([MyApp.Worker], strategy: :one_for_one)
[{_, worker_pid, _, _}] = Supervisor.which_children(sup)
# Crash the worker
Process.exit(worker_pid, :kill)
# Wait for restart
Process.sleep(100)
# Verify new worker started
[{_, new_pid, _, _}] = Supervisor.which_children(sup)
assert new_pid != worker_pid
assert Process.alive?(new_pid)
end
Debugging Concurrent Systems
Observer
Launch Observer for visual process inspection:
:observer.start()
Process Info
Inspect running processes:
# List all processes
Process.list()
# Process information
Process.info(pid)
# Message queue length
{:message_queue_len, count} = Process.info(pid, :message_queue_len)
# Current function
{:current_function, {mod, fun, arity}} = Process.info(pid, :current_function)
Tracing
Use :sys module for debugging:
# Enable tracing
:sys.trace(pid, true)
# Get state
:sys.get_state(pid)
# Get status
:sys.get_status(pid)
Performance Considerations
Process Spawning
- Processes are lightweight (< 2KB overhead)
- Spawning thousands/millions of processes is normal
- Use process pools when spawn rate is very high
Message Passing
- Messages are copied between processes
- Large messages are expensive - consider ETS or persistent_term
- Use binary for efficient large data transfer
Bottlenecks
- Single GenServer can become bottleneck
- Solution: shard state across multiple processes
- Use ETS with
read_concurrencyfor read-heavy workloads
Key Principles
- Embrace concurrency: Use processes liberally, they're cheap
- Let it crash: Don't write defensive code, use supervision
- Isolate failures: Design supervision trees to contain failures
- Communicate via messages: Avoid shared state between processes
- Use the right tool: GenServer for state, Task for work, Agent for simple state
- Test at boundaries: Test process APIs, not internal implementation
- Monitor and observe: Use Observer and logging to understand system behavior
Quick Install
/plugin add https://github.com/vinnie357/claude-skills/tree/main/otpCopy and paste this command in Claude Code to install this skill
GitHub 仓库
Related Skills
langchain
MetaLangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.
Algorithmic Art Generation
MetaThis skill helps developers create algorithmic art using p5.js, focusing on generative art, computational aesthetics, and interactive visualizations. It automatically activates for topics like "generative art" or "p5.js visualization" and guides you through creating unique algorithms with features like seeded randomness, flow fields, and particle systems. Use it when you need to build reproducible, code-driven artistic patterns.
webapp-testing
TestingThis Claude Skill provides a Playwright-based toolkit for testing local web applications through Python scripts. It enables frontend verification, UI debugging, screenshot capture, and log viewing while managing server lifecycles. Use it for browser automation tasks but run scripts directly rather than reading their source code to avoid context pollution.
requesting-code-review
DesignThis skill dispatches a code-reviewer subagent to analyze code changes against requirements before proceeding. It should be used after completing tasks, implementing major features, or before merging to main. The review helps catch issues early by comparing the current implementation with the original plan.
