event-sourcing
About
This skill implements event sourcing and CQRS patterns using event stores, aggregates, and projections. It enables building systems with full audit trails, temporal queries, and event replay capabilities. Use it when you need complete history tracking, compliance requirements, or complex domain models requiring state reconstruction.
Documentation
Event Sourcing
Overview
Store state changes as a sequence of events rather than the current state, enabling temporal queries, audit trails, and event replay.
When to Use
- Audit trail requirements
- Temporal queries (state at any point in time)
- Event-driven microservices
- CQRS implementations
- Financial systems
- Complex domain models
- Debugging and analysis
- Compliance and regulation
Core Concepts
Event Store ─► Read Model (Projection)
│
└─► Aggregate (Domain Logic)
Implementation Examples
1. Event Store (TypeScript)
interface DomainEvent {
id: string;
aggregateId: string;
aggregateType: string;
eventType: string;
data: any;
metadata: {
userId?: string;
timestamp: number;
version: number;
};
}
interface Aggregate {
id: string;
version: number;
}
class EventStore {
private events: DomainEvent[] = [];
async appendEvents(
aggregateId: string,
expectedVersion: number,
events: Omit<DomainEvent, 'id' | 'metadata'>[]
): Promise<void> {
// Optimistic concurrency check
const currentVersion = await this.getCurrentVersion(aggregateId);
if (currentVersion !== expectedVersion) {
throw new Error('Concurrency conflict');
}
const newEvents = events.map((event, index) => ({
...event,
id: crypto.randomUUID(),
metadata: {
timestamp: Date.now(),
version: expectedVersion + index + 1
}
}));
this.events.push(...newEvents);
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
return this.events
.filter(e => e.aggregateId === aggregateId)
.sort((a, b) => a.metadata.version - b.metadata.version);
}
async getCurrentVersion(aggregateId: string): Promise<number> {
const events = await this.getEvents(aggregateId);
return events.length > 0 ? events[events.length - 1].metadata.version : 0;
}
}
// Bank Account Aggregate
interface BankAccountState {
id: string;
balance: number;
isOpen: boolean;
version: number;
}
class BankAccount implements Aggregate {
id: string;
version: number;
private balance: number = 0;
private isOpen: boolean = false;
private uncommittedEvents: DomainEvent[] = [];
constructor(id: string) {
this.id = id;
this.version = 0;
}
// Commands
open(initialDeposit: number): void {
if (this.isOpen) {
throw new Error('Account already open');
}
this.applyEvent({
eventType: 'AccountOpened',
data: { initialDeposit }
});
}
deposit(amount: number): void {
if (!this.isOpen) {
throw new Error('Account not open');
}
if (amount <= 0) {
throw new Error('Amount must be positive');
}
this.applyEvent({
eventType: 'MoneyDeposited',
data: { amount }
});
}
withdraw(amount: number): void {
if (!this.isOpen) {
throw new Error('Account not open');
}
if (amount <= 0) {
throw new Error('Amount must be positive');
}
if (this.balance < amount) {
throw new Error('Insufficient funds');
}
this.applyEvent({
eventType: 'MoneyWithdrawn',
data: { amount }
});
}
close(): void {
if (!this.isOpen) {
throw new Error('Account not open');
}
if (this.balance > 0) {
throw new Error('Cannot close account with positive balance');
}
this.applyEvent({
eventType: 'AccountClosed',
data: {}
});
}
// Event Application
private applyEvent(event: Partial<DomainEvent>): void {
const fullEvent: any = {
aggregateId: this.id,
aggregateType: 'BankAccount',
...event
};
this.apply(fullEvent);
this.uncommittedEvents.push(fullEvent);
}
apply(event: DomainEvent): void {
switch (event.eventType) {
case 'AccountOpened':
this.isOpen = true;
this.balance = event.data.initialDeposit;
break;
case 'MoneyDeposited':
this.balance += event.data.amount;
break;
case 'MoneyWithdrawn':
this.balance -= event.data.amount;
break;
case 'AccountClosed':
this.isOpen = false;
break;
}
if (event.metadata) {
this.version = event.metadata.version;
}
}
getUncommittedEvents(): DomainEvent[] {
return this.uncommittedEvents;
}
clearUncommittedEvents(): void {
this.uncommittedEvents = [];
}
getState(): BankAccountState {
return {
id: this.id,
balance: this.balance,
isOpen: this.isOpen,
version: this.version
};
}
}
// Repository
class BankAccountRepository {
constructor(private eventStore: EventStore) {}
async save(account: BankAccount): Promise<void> {
const events = account.getUncommittedEvents();
if (events.length === 0) return;
await this.eventStore.appendEvents(
account.id,
account.version,
events
);
account.clearUncommittedEvents();
}
async load(id: string): Promise<BankAccount> {
const events = await this.eventStore.getEvents(id);
const account = new BankAccount(id);
events.forEach(event => account.apply(event));
return account;
}
}
// Usage
const eventStore = new EventStore();
const repository = new BankAccountRepository(eventStore);
// Create and use account
const account = new BankAccount('acc-123');
account.open(1000);
account.deposit(500);
account.withdraw(200);
await repository.save(account);
// Load account
const loadedAccount = await repository.load('acc-123');
console.log(loadedAccount.getState());
2. Projections (Read Models)
interface AccountReadModel {
id: string;
balance: number;
transactionCount: number;
lastActivity: number;
}
class AccountProjection {
private accounts = new Map<string, AccountReadModel>();
async project(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'AccountOpened':
await this.handleAccountOpened(event);
break;
case 'MoneyDeposited':
await this.handleMoneyDeposited(event);
break;
case 'MoneyWithdrawn':
await this.handleMoneyWithdrawn(event);
break;
}
}
private async handleAccountOpened(event: DomainEvent): Promise<void> {
this.accounts.set(event.aggregateId, {
id: event.aggregateId,
balance: event.data.initialDeposit,
transactionCount: 1,
lastActivity: event.metadata.timestamp
});
}
private async handleMoneyDeposited(event: DomainEvent): Promise<void> {
const account = this.accounts.get(event.aggregateId);
if (!account) return;
account.balance += event.data.amount;
account.transactionCount++;
account.lastActivity = event.metadata.timestamp;
}
private async handleMoneyWithdrawn(event: DomainEvent): Promise<void> {
const account = this.accounts.get(event.aggregateId);
if (!account) return;
account.balance -= event.data.amount;
account.transactionCount++;
account.lastActivity = event.metadata.timestamp;
}
getAccount(id: string): AccountReadModel | undefined {
return this.accounts.get(id);
}
getAllAccounts(): AccountReadModel[] {
return Array.from(this.accounts.values());
}
}
3. Event Store with PostgreSQL
import { Pool } from 'pg';
class PostgresEventStore {
constructor(private pool: Pool) {
this.createTables();
}
private async createTables(): Promise<void> {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
data JSONB NOT NULL,
metadata JSONB NOT NULL,
version INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(aggregate_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_aggregate
ON events (aggregate_id, version);
CREATE INDEX IF NOT EXISTS idx_events_type
ON events (event_type);
`);
}
async appendEvents(
aggregateId: string,
expectedVersion: number,
events: Omit<DomainEvent, 'id' | 'metadata'>[]
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Check version
const result = await client.query(
'SELECT MAX(version) as version FROM events WHERE aggregate_id = $1',
[aggregateId]
);
const currentVersion = result.rows[0].version || 0;
if (currentVersion !== expectedVersion) {
throw new Error('Concurrency conflict');
}
// Insert events
for (let i = 0; i < events.length; i++) {
const event = events[i];
const version = expectedVersion + i + 1;
await client.query(`
INSERT INTO events (
id, aggregate_id, aggregate_type, event_type,
data, metadata, version
)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, [
crypto.randomUUID(),
aggregateId,
event.aggregateType,
event.eventType,
JSON.stringify(event.data),
JSON.stringify({ timestamp: Date.now(), version }),
version
]);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(
aggregateId: string,
fromVersion: number = 0
): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC`,
[aggregateId, fromVersion]
);
return result.rows.map(row => ({
id: row.id,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
eventType: row.event_type,
data: row.data,
metadata: row.metadata
}));
}
async getEventsByType(
eventType: string,
fromTimestamp: number = 0
): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE event_type = $1
AND (metadata->>'timestamp')::bigint > $2
ORDER BY created_at ASC`,
[eventType, fromTimestamp]
);
return result.rows.map(row => ({
id: row.id,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
eventType: row.event_type,
data: row.data,
metadata: row.metadata
}));
}
async getAllEvents(
fromPosition: number = 0,
limit: number = 100
): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events
WHERE id > $1
ORDER BY created_at ASC
LIMIT $2`,
[fromPosition, limit]
);
return result.rows.map(row => ({
id: row.id,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
eventType: row.event_type,
data: row.data,
metadata: row.metadata
}));
}
}
4. Snapshots for Performance
interface Snapshot {
aggregateId: string;
version: number;
state: any;
createdAt: number;
}
class SnapshotStore {
private snapshots = new Map<string, Snapshot>();
async save(snapshot: Snapshot): Promise<void> {
this.snapshots.set(snapshot.aggregateId, snapshot);
}
async get(aggregateId: string): Promise<Snapshot | null> {
return this.snapshots.get(aggregateId) || null;
}
}
class SnapshotRepository {
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore,
private snapshotInterval: number = 10
) {}
async load(id: string): Promise<BankAccount> {
// Try to load from snapshot
const snapshot = await this.snapshotStore.get(id);
const fromVersion = snapshot?.version || 0;
// Load events since snapshot
const events = await this.eventStore.getEvents(id);
const recentEvents = events.filter(e => e.metadata.version > fromVersion);
const account = new BankAccount(id);
// Restore from snapshot
if (snapshot) {
Object.assign(account, snapshot.state);
}
// Apply recent events
recentEvents.forEach(event => account.apply(event));
return account;
}
async save(account: BankAccount): Promise<void> {
const events = account.getUncommittedEvents();
if (events.length === 0) return;
await this.eventStore.appendEvents(
account.id,
account.version,
events
);
// Create snapshot if needed
if (account.version % this.snapshotInterval === 0) {
await this.snapshotStore.save({
aggregateId: account.id,
version: account.version,
state: account.getState(),
createdAt: Date.now()
});
}
account.clearUncommittedEvents();
}
}
Best Practices
✅ DO
- Store events immutably
- Version your events
- Use optimistic concurrency
- Create snapshots for performance
- Use projections for queries
- Keep events small and focused
- Include metadata (timestamp, user, etc.)
- Handle event versioning/migration
❌ DON'T
- Mutate past events
- Store current state only
- Skip concurrency checks
- Query event store for reads
- Make events too large
- Forget about event schema evolution
Resources
Quick Install
/plugin add https://github.com/aj-geddes/useful-ai-prompts/tree/main/event-sourcingCopy and paste this command in Claude Code to install this skill
GitHub 仓库
Related Skills
sglang
MetaSGLang is a high-performance LLM serving framework that specializes in fast, structured generation for JSON, regex, and agentic workflows using its RadixAttention prefix caching. It delivers significantly faster inference, especially for tasks with repeated prefixes, making it ideal for complex, structured outputs and multi-turn conversations. Choose SGLang over alternatives like vLLM when you need constrained decoding or are building applications with extensive prefix sharing.
evaluating-llms-harness
TestingThis Claude Skill runs the lm-evaluation-harness to benchmark LLMs across 60+ standardized academic tasks like MMLU and GSM8K. It's designed for developers to compare model quality, track training progress, or report academic results. The tool supports various backends including HuggingFace and vLLM models.
llamaguard
OtherLlamaGuard is Meta's 7-8B parameter model for moderating LLM inputs and outputs across six safety categories like violence and hate speech. It offers 94-95% accuracy and can be deployed using vLLM, Hugging Face, or Amazon SageMaker. Use this skill to easily integrate content filtering and safety guardrails into your AI applications.
langchain
MetaLangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.
