concurrency-patterns
About
This skill provides patterns and implementations for writing thread-safe concurrent code, including mutexes, semaphores, and async/await. Use it when building multi-threaded applications, preventing race conditions, or developing high-performance systems that require parallel data processing. It offers practical examples like promise pools and concurrent data structures for task coordination and resource management.
Quick Install
Claude Code
Recommended/plugin add https://github.com/aj-geddes/useful-ai-promptsgit clone https://github.com/aj-geddes/useful-ai-prompts.git ~/.claude/skills/concurrency-patternsCopy and paste this command in Claude Code to install this skill
Documentation
Concurrency Patterns
Overview
Implement safe concurrent code using proper synchronization primitives and patterns for parallel execution.
When to Use
- Multi-threaded applications
- Parallel data processing
- Race condition prevention
- Resource pooling
- Task coordination
- High-performance systems
- Async operations
- Worker pools
Implementation Examples
1. Promise Pool (TypeScript)
class PromisePool {
private queue: Array<() => Promise<any>> = [];
private active = 0;
constructor(private concurrency: number) {}
async add<T>(fn: () => Promise<T>): Promise<T> {
while (this.active >= this.concurrency) {
await this.waitForSlot();
}
this.active++;
try {
return await fn();
} finally {
this.active--;
}
}
private async waitForSlot(): Promise<void> {
return new Promise(resolve => {
const checkSlot = () => {
if (this.active < this.concurrency) {
resolve();
} else {
setTimeout(checkSlot, 10);
}
};
checkSlot();
});
}
async map<T, R>(
items: T[],
fn: (item: T) => Promise<R>
): Promise<R[]> {
return Promise.all(
items.map(item => this.add(() => fn(item)))
);
}
}
// Usage
const pool = new PromisePool(5);
const urls = Array.from({ length: 100 }, (_, i) =>
`https://api.example.com/item/${i}`
);
const results = await pool.map(urls, async (url) => {
const response = await fetch(url);
return response.json();
});
2. Mutex and Semaphore (TypeScript)
class Mutex {
private locked = false;
private queue: Array<() => void> = [];
async acquire(): Promise<void> {
if (!this.locked) {
this.locked = true;
return;
}
return new Promise(resolve => {
this.queue.push(resolve);
});
}
release(): void {
if (this.queue.length > 0) {
const resolve = this.queue.shift()!;
resolve();
} else {
this.locked = false;
}
}
async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
class Semaphore {
private available: number;
private queue: Array<() => void> = [];
constructor(private max: number) {
this.available = max;
}
async acquire(): Promise<void> {
if (this.available > 0) {
this.available--;
return;
}
return new Promise(resolve => {
this.queue.push(resolve);
});
}
release(): void {
if (this.queue.length > 0) {
const resolve = this.queue.shift()!;
resolve();
} else {
this.available++;
}
}
async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// Usage
const mutex = new Mutex();
let counter = 0;
async function incrementCounter() {
await mutex.runExclusive(async () => {
const current = counter;
await new Promise(resolve => setTimeout(resolve, 10));
counter = current + 1;
});
}
// Database connection pool with semaphore
const dbSemaphore = new Semaphore(10); // Max 10 concurrent connections
async function queryDatabase(query: string) {
return dbSemaphore.runExclusive(async () => {
// Execute query
return executeQuery(query);
});
}
async function executeQuery(query: string) {
// Query logic
}
3. Worker Pool (Node.js)
import { Worker } from 'worker_threads';
interface Task<T> {
id: string;
data: any;
resolve: (value: T) => void;
reject: (error: Error) => void;
}
class WorkerPool {
private workers: Worker[] = [];
private availableWorkers: Worker[] = [];
private taskQueue: Task<any>[] = [];
constructor(
private workerScript: string,
private poolSize: number
) {
this.initializeWorkers();
}
private initializeWorkers(): void {
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(this.workerScript);
worker.on('message', (result) => {
this.handleWorkerMessage(worker, result);
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
this.workers.push(worker);
this.availableWorkers.push(worker);
}
}
async execute<T>(data: any): Promise<T> {
return new Promise((resolve, reject) => {
const task: Task<T> = {
id: Math.random().toString(36),
data,
resolve,
reject
};
this.taskQueue.push(task);
this.processQueue();
});
}
private processQueue(): void {
while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) {
const task = this.taskQueue.shift()!;
const worker = this.availableWorkers.shift()!;
worker.postMessage({
taskId: task.id,
data: task.data
});
(worker as any).currentTask = task;
}
}
private handleWorkerMessage(worker: Worker, result: any): void {
const task = (worker as any).currentTask as Task<any>;
if (!task) return;
if (result.error) {
task.reject(new Error(result.error));
} else {
task.resolve(result.data);
}
delete (worker as any).currentTask;
this.availableWorkers.push(worker);
this.processQueue();
}
async terminate(): Promise<void> {
await Promise.all(
this.workers.map(worker => worker.terminate())
);
}
}
// worker.js
// const { parentPort } = require('worker_threads');
//
// parentPort.on('message', async ({ taskId, data }) => {
// try {
// const result = await processData(data);
// parentPort.postMessage({ taskId, data: result });
// } catch (error) {
// parentPort.postMessage({ taskId, error: error.message });
// }
// });
4. Python Threading Patterns
import threading
from queue import Queue
from typing import Callable, List, TypeVar, Generic
import time
T = TypeVar('T')
R = TypeVar('R')
class ThreadPool(Generic[T, R]):
def __init__(self, num_threads: int):
self.num_threads = num_threads
self.tasks: Queue = Queue()
self.results: List[R] = []
self.lock = threading.Lock()
self.workers: List[threading.Thread] = []
def map(self, func: Callable[[T], R], items: List[T]) -> List[R]:
"""Map function over items using thread pool."""
# Add tasks to queue
for item in items:
self.tasks.put(item)
# Start workers
for _ in range(self.num_threads):
worker = threading.Thread(
target=self._worker,
args=(func,)
)
worker.start()
self.workers.append(worker)
# Wait for completion
self.tasks.join()
# Stop workers
for _ in range(self.num_threads):
self.tasks.put(None)
for worker in self.workers:
worker.join()
return self.results
def _worker(self, func: Callable[[T], R]):
"""Worker thread."""
while True:
item = self.tasks.get()
if item is None:
self.tasks.task_done()
break
try:
result = func(item)
with self.lock:
self.results.append(result)
finally:
self.tasks.task_done()
class Mutex:
def __init__(self):
self._lock = threading.Lock()
def __enter__(self):
self._lock.acquire()
return self
def __exit__(self, *args):
self._lock.release()
class Semaphore:
def __init__(self, max_count: int):
self._semaphore = threading.Semaphore(max_count)
def __enter__(self):
self._semaphore.acquire()
return self
def __exit__(self, *args):
self._semaphore.release()
# Usage
def process_item(item: int) -> int:
time.sleep(0.1)
return item * 2
pool = ThreadPool(num_threads=5)
items = list(range(100))
results = pool.map(process_item, items)
print(f"Processed {len(results)} items")
# Mutex example
counter = 0
mutex = Mutex()
def increment():
global counter
with mutex:
current = counter
time.sleep(0.001)
counter = current + 1
threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # Should be 100
# Semaphore example
db_connections = Semaphore(max_count=10)
def query_database(query: str):
with db_connections:
# Execute query with limited connections
time.sleep(0.1)
print(f"Executing: {query}")
5. Async Patterns (Python asyncio)
import asyncio
from typing import Callable, List, TypeVar, Awaitable
T = TypeVar('T')
R = TypeVar('R')
class AsyncPool:
def __init__(self, concurrency: int):
self.semaphore = asyncio.Semaphore(concurrency)
async def map(
self,
func: Callable[[T], Awaitable[R]],
items: List[T]
) -> List[R]:
"""Map async function over items with concurrency limit."""
async def bounded_func(item: T) -> R:
async with self.semaphore:
return await func(item)
return await asyncio.gather(*[
bounded_func(item) for item in items
])
class AsyncQueue:
def __init__(self, max_size: int = 0):
self.queue = asyncio.Queue(maxsize=max_size)
async def put(self, item):
await self.queue.put(item)
async def get(self):
return await self.queue.get()
def task_done(self):
self.queue.task_done()
async def join(self):
await self.queue.join()
# Producer-Consumer pattern
async def producer(queue: AsyncQueue, items: List[int]):
"""Produce items."""
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.1)
async def consumer(queue: AsyncQueue, name: str):
"""Consume items."""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consuming: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = AsyncQueue(max_size=10)
# Start consumers
consumers = [
asyncio.create_task(consumer(queue, f"Consumer-{i}"))
for i in range(3)
]
# Start producer
await producer(queue, list(range(20)))
# Wait for all items to be processed
await queue.join()
# Stop consumers
for _ in range(3):
await queue.put(None)
await asyncio.gather(*consumers)
asyncio.run(main())
6. Go-Style Channels (Simulation)
class Channel<T> {
private buffer: T[] = [];
private senders: Array<{ value: T; resolve: () => void }> = [];
private receivers: Array<(value: T) => void> = [];
private closed = false;
constructor(private bufferSize: number = 0) {}
async send(value: T): Promise<void> {
if (this.closed) {
throw new Error('Channel is closed');
}
if (this.receivers.length > 0) {
const receiver = this.receivers.shift()!;
receiver(value);
return;
}
if (this.buffer.length < this.bufferSize) {
this.buffer.push(value);
return;
}
return new Promise(resolve => {
this.senders.push({ value, resolve });
});
}
async receive(): Promise<T | undefined> {
if (this.buffer.length > 0) {
const value = this.buffer.shift()!;
if (this.senders.length > 0) {
const sender = this.senders.shift()!;
this.buffer.push(sender.value);
sender.resolve();
}
return value;
}
if (this.senders.length > 0) {
const sender = this.senders.shift()!;
sender.resolve();
return sender.value;
}
if (this.closed) {
return undefined;
}
return new Promise(resolve => {
this.receivers.push(resolve);
});
}
close(): void {
this.closed = true;
this.receivers.forEach(receiver => receiver(undefined as any));
this.receivers = [];
}
}
// Usage
async function example() {
const channel = new Channel<number>(5);
// Producer
async function producer() {
for (let i = 0; i < 10; i++) {
await channel.send(i);
console.log(`Sent: ${i}`);
}
channel.close();
}
// Consumer
async function consumer() {
while (true) {
const value = await channel.receive();
if (value === undefined) break;
console.log(`Received: ${value}`);
}
}
await Promise.all([
producer(),
consumer()
]);
}
Best Practices
✅ DO
- Use proper synchronization primitives
- Limit concurrency to avoid resource exhaustion
- Handle errors in concurrent operations
- Use immutable data when possible
- Test concurrent code thoroughly
- Profile concurrent performance
- Document thread-safety guarantees
❌ DON'T
- Share mutable state without synchronization
- Use sleep/polling for coordination
- Create unlimited threads/workers
- Ignore race conditions
- Block event loops in async code
- Forget to clean up resources
Resources
GitHub Repository
Related Skills
content-collections
MetaThis skill provides a production-tested setup for Content Collections, a TypeScript-first tool that transforms Markdown/MDX files into type-safe data collections with Zod validation. Use it when building blogs, documentation sites, or content-heavy Vite + React applications to ensure type safety and automatic content validation. It covers everything from Vite plugin configuration and MDX compilation to deployment optimization and schema validation.
creating-opencode-plugins
MetaThis skill provides the structure and API specifications for creating OpenCode plugins that hook into 25+ event types like commands, files, and LSP operations. It offers implementation patterns for JavaScript/TypeScript modules that intercept and extend the AI assistant's lifecycle. Use it when you need to build event-driven plugins for monitoring, custom handling, or extending OpenCode's capabilities.
sglang
MetaSGLang is a high-performance LLM serving framework that specializes in fast, structured generation for JSON, regex, and agentic workflows using its RadixAttention prefix caching. It delivers significantly faster inference, especially for tasks with repeated prefixes, making it ideal for complex, structured outputs and multi-turn conversations. Choose SGLang over alternatives like vLLM when you need constrained decoding or are building applications with extensive prefix sharing.
evaluating-llms-harness
TestingThis Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.
