concurrency-patterns
About
This skill provides patterns and implementations for writing thread-safe concurrent code, including mutexes, semaphores, and async/await. Use it when building multi-threaded applications, preventing race conditions, or developing high-performance systems that require parallel data processing. It offers practical examples like promise pools and concurrent data structures for task coordination and resource management.
Documentation
Concurrency Patterns
Overview
Implement safe concurrent code using proper synchronization primitives and patterns for parallel execution.
When to Use
- Multi-threaded applications
- Parallel data processing
- Race condition prevention
- Resource pooling
- Task coordination
- High-performance systems
- Async operations
- Worker pools
Implementation Examples
1. Promise Pool (TypeScript)
class PromisePool {
private queue: Array<() => Promise<any>> = [];
private active = 0;
constructor(private concurrency: number) {}
async add<T>(fn: () => Promise<T>): Promise<T> {
while (this.active >= this.concurrency) {
await this.waitForSlot();
}
this.active++;
try {
return await fn();
} finally {
this.active--;
}
}
private async waitForSlot(): Promise<void> {
return new Promise(resolve => {
const checkSlot = () => {
if (this.active < this.concurrency) {
resolve();
} else {
setTimeout(checkSlot, 10);
}
};
checkSlot();
});
}
async map<T, R>(
items: T[],
fn: (item: T) => Promise<R>
): Promise<R[]> {
return Promise.all(
items.map(item => this.add(() => fn(item)))
);
}
}
// Usage
const pool = new PromisePool(5);
const urls = Array.from({ length: 100 }, (_, i) =>
`https://api.example.com/item/${i}`
);
const results = await pool.map(urls, async (url) => {
const response = await fetch(url);
return response.json();
});
2. Mutex and Semaphore (TypeScript)
class Mutex {
private locked = false;
private queue: Array<() => void> = [];
async acquire(): Promise<void> {
if (!this.locked) {
this.locked = true;
return;
}
return new Promise(resolve => {
this.queue.push(resolve);
});
}
release(): void {
if (this.queue.length > 0) {
const resolve = this.queue.shift()!;
resolve();
} else {
this.locked = false;
}
}
async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
class Semaphore {
private available: number;
private queue: Array<() => void> = [];
constructor(private max: number) {
this.available = max;
}
async acquire(): Promise<void> {
if (this.available > 0) {
this.available--;
return;
}
return new Promise(resolve => {
this.queue.push(resolve);
});
}
release(): void {
if (this.queue.length > 0) {
const resolve = this.queue.shift()!;
resolve();
} else {
this.available++;
}
}
async runExclusive<T>(fn: () => Promise<T>): Promise<T> {
await this.acquire();
try {
return await fn();
} finally {
this.release();
}
}
}
// Usage
const mutex = new Mutex();
let counter = 0;
async function incrementCounter() {
await mutex.runExclusive(async () => {
const current = counter;
await new Promise(resolve => setTimeout(resolve, 10));
counter = current + 1;
});
}
// Database connection pool with semaphore
const dbSemaphore = new Semaphore(10); // Max 10 concurrent connections
async function queryDatabase(query: string) {
return dbSemaphore.runExclusive(async () => {
// Execute query
return executeQuery(query);
});
}
async function executeQuery(query: string) {
// Query logic
}
3. Worker Pool (Node.js)
import { Worker } from 'worker_threads';
interface Task<T> {
id: string;
data: any;
resolve: (value: T) => void;
reject: (error: Error) => void;
}
class WorkerPool {
private workers: Worker[] = [];
private availableWorkers: Worker[] = [];
private taskQueue: Task<any>[] = [];
constructor(
private workerScript: string,
private poolSize: number
) {
this.initializeWorkers();
}
private initializeWorkers(): void {
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(this.workerScript);
worker.on('message', (result) => {
this.handleWorkerMessage(worker, result);
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});
this.workers.push(worker);
this.availableWorkers.push(worker);
}
}
async execute<T>(data: any): Promise<T> {
return new Promise((resolve, reject) => {
const task: Task<T> = {
id: Math.random().toString(36),
data,
resolve,
reject
};
this.taskQueue.push(task);
this.processQueue();
});
}
private processQueue(): void {
while (this.taskQueue.length > 0 && this.availableWorkers.length > 0) {
const task = this.taskQueue.shift()!;
const worker = this.availableWorkers.shift()!;
worker.postMessage({
taskId: task.id,
data: task.data
});
(worker as any).currentTask = task;
}
}
private handleWorkerMessage(worker: Worker, result: any): void {
const task = (worker as any).currentTask as Task<any>;
if (!task) return;
if (result.error) {
task.reject(new Error(result.error));
} else {
task.resolve(result.data);
}
delete (worker as any).currentTask;
this.availableWorkers.push(worker);
this.processQueue();
}
async terminate(): Promise<void> {
await Promise.all(
this.workers.map(worker => worker.terminate())
);
}
}
// worker.js
// const { parentPort } = require('worker_threads');
//
// parentPort.on('message', async ({ taskId, data }) => {
// try {
// const result = await processData(data);
// parentPort.postMessage({ taskId, data: result });
// } catch (error) {
// parentPort.postMessage({ taskId, error: error.message });
// }
// });
4. Python Threading Patterns
import threading
from queue import Queue
from typing import Callable, List, TypeVar, Generic
import time
T = TypeVar('T')
R = TypeVar('R')
class ThreadPool(Generic[T, R]):
def __init__(self, num_threads: int):
self.num_threads = num_threads
self.tasks: Queue = Queue()
self.results: List[R] = []
self.lock = threading.Lock()
self.workers: List[threading.Thread] = []
def map(self, func: Callable[[T], R], items: List[T]) -> List[R]:
"""Map function over items using thread pool."""
# Add tasks to queue
for item in items:
self.tasks.put(item)
# Start workers
for _ in range(self.num_threads):
worker = threading.Thread(
target=self._worker,
args=(func,)
)
worker.start()
self.workers.append(worker)
# Wait for completion
self.tasks.join()
# Stop workers
for _ in range(self.num_threads):
self.tasks.put(None)
for worker in self.workers:
worker.join()
return self.results
def _worker(self, func: Callable[[T], R]):
"""Worker thread."""
while True:
item = self.tasks.get()
if item is None:
self.tasks.task_done()
break
try:
result = func(item)
with self.lock:
self.results.append(result)
finally:
self.tasks.task_done()
class Mutex:
def __init__(self):
self._lock = threading.Lock()
def __enter__(self):
self._lock.acquire()
return self
def __exit__(self, *args):
self._lock.release()
class Semaphore:
def __init__(self, max_count: int):
self._semaphore = threading.Semaphore(max_count)
def __enter__(self):
self._semaphore.acquire()
return self
def __exit__(self, *args):
self._semaphore.release()
# Usage
def process_item(item: int) -> int:
time.sleep(0.1)
return item * 2
pool = ThreadPool(num_threads=5)
items = list(range(100))
results = pool.map(process_item, items)
print(f"Processed {len(results)} items")
# Mutex example
counter = 0
mutex = Mutex()
def increment():
global counter
with mutex:
current = counter
time.sleep(0.001)
counter = current + 1
threads = [threading.Thread(target=increment) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Counter: {counter}") # Should be 100
# Semaphore example
db_connections = Semaphore(max_count=10)
def query_database(query: str):
with db_connections:
# Execute query with limited connections
time.sleep(0.1)
print(f"Executing: {query}")
5. Async Patterns (Python asyncio)
import asyncio
from typing import Callable, List, TypeVar, Awaitable
T = TypeVar('T')
R = TypeVar('R')
class AsyncPool:
def __init__(self, concurrency: int):
self.semaphore = asyncio.Semaphore(concurrency)
async def map(
self,
func: Callable[[T], Awaitable[R]],
items: List[T]
) -> List[R]:
"""Map async function over items with concurrency limit."""
async def bounded_func(item: T) -> R:
async with self.semaphore:
return await func(item)
return await asyncio.gather(*[
bounded_func(item) for item in items
])
class AsyncQueue:
def __init__(self, max_size: int = 0):
self.queue = asyncio.Queue(maxsize=max_size)
async def put(self, item):
await self.queue.put(item)
async def get(self):
return await self.queue.get()
def task_done(self):
self.queue.task_done()
async def join(self):
await self.queue.join()
# Producer-Consumer pattern
async def producer(queue: AsyncQueue, items: List[int]):
"""Produce items."""
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.1)
async def consumer(queue: AsyncQueue, name: str):
"""Consume items."""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consuming: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = AsyncQueue(max_size=10)
# Start consumers
consumers = [
asyncio.create_task(consumer(queue, f"Consumer-{i}"))
for i in range(3)
]
# Start producer
await producer(queue, list(range(20)))
# Wait for all items to be processed
await queue.join()
# Stop consumers
for _ in range(3):
await queue.put(None)
await asyncio.gather(*consumers)
asyncio.run(main())
6. Go-Style Channels (Simulation)
class Channel<T> {
private buffer: T[] = [];
private senders: Array<{ value: T; resolve: () => void }> = [];
private receivers: Array<(value: T) => void> = [];
private closed = false;
constructor(private bufferSize: number = 0) {}
async send(value: T): Promise<void> {
if (this.closed) {
throw new Error('Channel is closed');
}
if (this.receivers.length > 0) {
const receiver = this.receivers.shift()!;
receiver(value);
return;
}
if (this.buffer.length < this.bufferSize) {
this.buffer.push(value);
return;
}
return new Promise(resolve => {
this.senders.push({ value, resolve });
});
}
async receive(): Promise<T | undefined> {
if (this.buffer.length > 0) {
const value = this.buffer.shift()!;
if (this.senders.length > 0) {
const sender = this.senders.shift()!;
this.buffer.push(sender.value);
sender.resolve();
}
return value;
}
if (this.senders.length > 0) {
const sender = this.senders.shift()!;
sender.resolve();
return sender.value;
}
if (this.closed) {
return undefined;
}
return new Promise(resolve => {
this.receivers.push(resolve);
});
}
close(): void {
this.closed = true;
this.receivers.forEach(receiver => receiver(undefined as any));
this.receivers = [];
}
}
// Usage
async function example() {
const channel = new Channel<number>(5);
// Producer
async function producer() {
for (let i = 0; i < 10; i++) {
await channel.send(i);
console.log(`Sent: ${i}`);
}
channel.close();
}
// Consumer
async function consumer() {
while (true) {
const value = await channel.receive();
if (value === undefined) break;
console.log(`Received: ${value}`);
}
}
await Promise.all([
producer(),
consumer()
]);
}
Best Practices
✅ DO
- Use proper synchronization primitives
- Limit concurrency to avoid resource exhaustion
- Handle errors in concurrent operations
- Use immutable data when possible
- Test concurrent code thoroughly
- Profile concurrent performance
- Document thread-safety guarantees
❌ DON'T
- Share mutable state without synchronization
- Use sleep/polling for coordination
- Create unlimited threads/workers
- Ignore race conditions
- Block event loops in async code
- Forget to clean up resources
Resources
Quick Install
/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/concurrency-patternsCopy and paste this command in Claude Code to install this skill
GitHub 仓库
Related Skills
sglang
MetaSGLang is a high-performance LLM serving framework that specializes in fast, structured generation for JSON, regex, and agentic workflows using its RadixAttention prefix caching. It delivers significantly faster inference, especially for tasks with repeated prefixes, making it ideal for complex, structured outputs and multi-turn conversations. Choose SGLang over alternatives like vLLM when you need constrained decoding or are building applications with extensive prefix sharing.
evaluating-llms-harness
TestingThis Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.
llamaguard
OtherLlamaGuard is Meta's 7-8B parameter model for moderating LLM inputs and outputs across six safety categories like violence and hate speech. It offers 94-95% accuracy and can be deployed using vLLM, Hugging Face, or Amazon SageMaker. Use this skill to easily integrate content filtering and safety guardrails into your AI applications.
langchain
MetaLangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.
