MCP HubMCP Hub
スキル一覧に戻る

cqrs-implementation

majiayu000
更新日 Yesterday
23 閲覧
58
9
58
GitHubで表示
メタdesign

について

このスキルは、スケーラブルなシステムアーキテクチャのためのCQRSパターン実装に関する包括的なガイドを提供します。開発者が読み取りモデルと書き込みモデルを分離し、クエリパフォーマンスを最適化し、イベントソーシングシステムを構築するのに役立ちます。読み取り処理を書き込み処理から独立してスケールさせる必要がある場合や、コマンドとクエリで異なるデータモデルが必要な場合にご利用ください。

クイックインストール

Claude Code

推奨
プラグインコマンド推奨
/plugin add https://github.com/majiayu000/claude-skill-registry
Git クローン代替
git clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/cqrs-implementation

このコマンドをClaude Codeにコピー&ペーストしてスキルをインストールします

ドキュメント

CQRS Implementation

Comprehensive guide to implementing CQRS (Command Query Responsibility Segregation) patterns.

When to Use This Skill

  • Separating read and write concerns
  • Scaling reads independently from writes
  • Building event-sourced systems
  • Optimizing complex query scenarios
  • Different read/write data models needed
  • High-performance reporting requirements

Core Concepts

1. CQRS Architecture

                    ┌─────────────┐
                    │   Client    │
                    └──────┬──────┘
                           │
              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Commands   │          │   Queries   │
       │    API      │          │    API      │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Command    │          │   Query     │
       │  Handlers   │          │  Handlers   │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │   Write     │─────────►│    Read     │
       │   Model     │  Events  │   Model     │
       └─────────────┘          └─────────────┘

2. Key Components

ComponentResponsibility
CommandIntent to change state
Command HandlerValidates and executes commands
EventRecord of state change
QueryRequest for data
Query HandlerRetrieves data from read model
ProjectorUpdates read model from events

Templates

Template 1: Command Infrastructure

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, Dict, Any, Type
from datetime import datetime
import uuid

# Command base
@dataclass
class Command:
    command_id: str = None
    timestamp: datetime = None

    def __post_init__(self):
        self.command_id = self.command_id or str(uuid.uuid4())
        self.timestamp = self.timestamp or datetime.utcnow()


# Concrete commands
@dataclass
class CreateOrder(Command):
    customer_id: str
    items: list
    shipping_address: dict


@dataclass
class AddOrderItem(Command):
    order_id: str
    product_id: str
    quantity: int
    price: float


@dataclass
class CancelOrder(Command):
    order_id: str
    reason: str


# Command handler base
T = TypeVar('T', bound=Command)

class CommandHandler(ABC, Generic[T]):
    @abstractmethod
    async def handle(self, command: T) -> Any:
        pass


# Command bus
class CommandBus:
    def __init__(self):
        self._handlers: Dict[Type[Command], CommandHandler] = {}

    def register(self, command_type: Type[Command], handler: CommandHandler):
        self._handlers[command_type] = handler

    async def dispatch(self, command: Command) -> Any:
        handler = self._handlers.get(type(command))
        if not handler:
            raise ValueError(f"No handler for {type(command).__name__}")
        return await handler.handle(command)


# Command handler implementation
class CreateOrderHandler(CommandHandler[CreateOrder]):
    def __init__(self, order_repository, event_store):
        self.order_repository = order_repository
        self.event_store = event_store

    async def handle(self, command: CreateOrder) -> str:
        # Validate
        if not command.items:
            raise ValueError("Order must have at least one item")

        # Create aggregate
        order = Order.create(
            customer_id=command.customer_id,
            items=command.items,
            shipping_address=command.shipping_address
        )

        # Persist events
        await self.event_store.append_events(
            stream_id=f"Order-{order.id}",
            stream_type="Order",
            events=order.uncommitted_events
        )

        return order.id

Template 2: Query Infrastructure

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, List, Optional

# Query base
@dataclass
class Query:
    pass


# Concrete queries
@dataclass
class GetOrderById(Query):
    order_id: str


@dataclass
class GetCustomerOrders(Query):
    customer_id: str
    status: Optional[str] = None
    page: int = 1
    page_size: int = 20


@dataclass
class SearchOrders(Query):
    query: str
    filters: dict = None
    sort_by: str = "created_at"
    sort_order: str = "desc"


# Query result types
@dataclass
class OrderView:
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    item_count: int
    created_at: datetime
    shipped_at: Optional[datetime] = None


@dataclass
class PaginatedResult(Generic[T]):
    items: List[T]
    total: int
    page: int
    page_size: int

    @property
    def total_pages(self) -> int:
        return (self.total + self.page_size - 1) // self.page_size


# Query handler base
T = TypeVar('T', bound=Query)
R = TypeVar('R')

class QueryHandler(ABC, Generic[T, R]):
    @abstractmethod
    async def handle(self, query: T) -> R:
        pass


# Query bus
class QueryBus:
    def __init__(self):
        self._handlers: Dict[Type[Query], QueryHandler] = {}

    def register(self, query_type: Type[Query], handler: QueryHandler):
        self._handlers[query_type] = handler

    async def dispatch(self, query: Query) -> Any:
        handler = self._handlers.get(type(query))
        if not handler:
            raise ValueError(f"No handler for {type(query).__name__}")
        return await handler.handle(query)


# Query handler implementation
class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]):
    def __init__(self, read_db):
        self.read_db = read_db

    async def handle(self, query: GetOrderById) -> Optional[OrderView]:
        async with self.read_db.acquire() as conn:
            row = await conn.fetchrow(
                """
                SELECT order_id, customer_id, status, total_amount,
                       item_count, created_at, shipped_at
                FROM order_views
                WHERE order_id = $1
                """,
                query.order_id
            )
            if row:
                return OrderView(**dict(row))
            return None


class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]):
    def __init__(self, read_db):
        self.read_db = read_db

    async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
        async with self.read_db.acquire() as conn:
            # Build query with optional status filter
            where_clause = "customer_id = $1"
            params = [query.customer_id]

            if query.status:
                where_clause += " AND status = $2"
                params.append(query.status)

            # Get total count
            total = await conn.fetchval(
                f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
                *params
            )

            # Get paginated results
            offset = (query.page - 1) * query.page_size
            rows = await conn.fetch(
                f"""
                SELECT order_id, customer_id, status, total_amount,
                       item_count, created_at, shipped_at
                FROM order_views
                WHERE {where_clause}
                ORDER BY created_at DESC
                LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
                """,
                *params, query.page_size, offset
            )

            return PaginatedResult(
                items=[OrderView(**dict(row)) for row in rows],
                total=total,
                page=query.page,
                page_size=query.page_size
            )

Template 3: FastAPI CQRS Application

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()

# Request/Response models
class CreateOrderRequest(BaseModel):
    customer_id: str
    items: List[dict]
    shipping_address: dict


class OrderResponse(BaseModel):
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    item_count: int
    created_at: datetime


# Dependency injection
def get_command_bus() -> CommandBus:
    return app.state.command_bus


def get_query_bus() -> QueryBus:
    return app.state.query_bus


# Command endpoints (POST, PUT, DELETE)
@app.post("/orders", response_model=dict)
async def create_order(
    request: CreateOrderRequest,
    command_bus: CommandBus = Depends(get_command_bus)
):
    command = CreateOrder(
        customer_id=request.customer_id,
        items=request.items,
        shipping_address=request.shipping_address
    )
    order_id = await command_bus.dispatch(command)
    return {"order_id": order_id}


@app.post("/orders/{order_id}/items")
async def add_item(
    order_id: str,
    product_id: str,
    quantity: int,
    price: float,
    command_bus: CommandBus = Depends(get_command_bus)
):
    command = AddOrderItem(
        order_id=order_id,
        product_id=product_id,
        quantity=quantity,
        price=price
    )
    await command_bus.dispatch(command)
    return {"status": "item_added"}


@app.delete("/orders/{order_id}")
async def cancel_order(
    order_id: str,
    reason: str,
    command_bus: CommandBus = Depends(get_command_bus)
):
    command = CancelOrder(order_id=order_id, reason=reason)
    await command_bus.dispatch(command)
    return {"status": "cancelled"}


# Query endpoints (GET)
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
    order_id: str,
    query_bus: QueryBus = Depends(get_query_bus)
):
    query = GetOrderById(order_id=order_id)
    result = await query_bus.dispatch(query)
    if not result:
        raise HTTPException(status_code=404, detail="Order not found")
    return result


@app.get("/customers/{customer_id}/orders")
async def get_customer_orders(
    customer_id: str,
    status: Optional[str] = None,
    page: int = 1,
    page_size: int = 20,
    query_bus: QueryBus = Depends(get_query_bus)
):
    query = GetCustomerOrders(
        customer_id=customer_id,
        status=status,
        page=page,
        page_size=page_size
    )
    return await query_bus.dispatch(query)


@app.get("/orders/search")
async def search_orders(
    q: str,
    sort_by: str = "created_at",
    query_bus: QueryBus = Depends(get_query_bus)
):
    query = SearchOrders(query=q, sort_by=sort_by)
    return await query_bus.dispatch(query)

Template 4: Read Model Synchronization

class ReadModelSynchronizer:
    """Keeps read models in sync with events."""

    def __init__(self, event_store, read_db, projections: List[Projection]):
        self.event_store = event_store
        self.read_db = read_db
        self.projections = {p.name: p for p in projections}

    async def run(self):
        """Continuously sync read models."""
        while True:
            for name, projection in self.projections.items():
                await self._sync_projection(projection)
            await asyncio.sleep(0.1)

    async def _sync_projection(self, projection: Projection):
        checkpoint = await self._get_checkpoint(projection.name)

        events = await self.event_store.read_all(
            from_position=checkpoint,
            limit=100
        )

        for event in events:
            if event.event_type in projection.handles():
                try:
                    await projection.apply(event)
                except Exception as e:
                    # Log error, possibly retry or skip
                    logger.error(f"Projection error: {e}")
                    continue

            await self._save_checkpoint(projection.name, event.global_position)

    async def rebuild_projection(self, projection_name: str):
        """Rebuild a projection from scratch."""
        projection = self.projections[projection_name]

        # Clear existing data
        await projection.clear()

        # Reset checkpoint
        await self._save_checkpoint(projection_name, 0)

        # Rebuild
        while True:
            checkpoint = await self._get_checkpoint(projection_name)
            events = await self.event_store.read_all(checkpoint, 1000)

            if not events:
                break

            for event in events:
                if event.event_type in projection.handles():
                    await projection.apply(event)

            await self._save_checkpoint(
                projection_name,
                events[-1].global_position
            )

Template 5: Eventual Consistency Handling

class ConsistentQueryHandler:
    """Query handler that can wait for consistency."""

    def __init__(self, read_db, event_store):
        self.read_db = read_db
        self.event_store = event_store

    async def query_after_command(
        self,
        query: Query,
        expected_version: int,
        stream_id: str,
        timeout: float = 5.0
    ):
        """
        Execute query, ensuring read model is at expected version.
        Used for read-your-writes consistency.
        """
        start_time = time.time()

        while time.time() - start_time < timeout:
            # Check if read model is caught up
            projection_version = await self._get_projection_version(stream_id)

            if projection_version >= expected_version:
                return await self.execute_query(query)

            # Wait a bit and retry
            await asyncio.sleep(0.1)

        # Timeout - return stale data with warning
        return {
            "data": await self.execute_query(query),
            "_warning": "Data may be stale"
        }

    async def _get_projection_version(self, stream_id: str) -> int:
        """Get the last processed event version for a stream."""
        async with self.read_db.acquire() as conn:
            return await conn.fetchval(
                "SELECT last_event_version FROM projection_state WHERE stream_id = $1",
                stream_id
            ) or 0

Best Practices

Do's

  • Separate command and query models - Different needs
  • Use eventual consistency - Accept propagation delay
  • Validate in command handlers - Before state change
  • Denormalize read models - Optimize for queries
  • Version your events - For schema evolution

Don'ts

  • Don't query in commands - Use only for writes
  • Don't couple read/write schemas - Independent evolution
  • Don't over-engineer - Start simple
  • Don't ignore consistency SLAs - Define acceptable lag

Resources

GitHub リポジトリ

majiayu000/claude-skill-registry
パス: skills/cqrs-implementation

関連スキル

content-collections

メタ

This skill provides a production-tested setup for Content Collections, a TypeScript-first tool that transforms Markdown/MDX files into type-safe data collections with Zod validation. Use it when building blogs, documentation sites, or content-heavy Vite + React applications to ensure type safety and automatic content validation. It covers everything from Vite plugin configuration and MDX compilation to deployment optimization and schema validation.

スキルを見る

creating-opencode-plugins

メタ

This skill provides the structure and API specifications for creating OpenCode plugins that hook into 25+ event types like commands, files, and LSP operations. It offers implementation patterns for JavaScript/TypeScript modules that intercept and extend the AI assistant's lifecycle. Use it when you need to build event-driven plugins for monitoring, custom handling, or extending OpenCode's capabilities.

スキルを見る

polymarket

メタ

This skill enables developers to build applications with the Polymarket prediction markets platform, including API integration for trading and market data. It also provides real-time data streaming via WebSocket to monitor live trades and market activity. Use it for implementing trading strategies or creating tools that process live market updates.

スキルを見る

langchain

メタ

LangChain is a framework for building LLM applications using agents, chains, and RAG pipelines. It supports multiple LLM providers, offers 500+ integrations, and includes features like tool calling and memory management. Use it for rapid prototyping and deploying production systems like chatbots, autonomous agents, and question-answering services.

スキルを見る