MCP HubMCP Hub
スキル一覧に戻る

configuring-dapr-pubsub

majiayu000
更新日 Today
22 閲覧
58
9
58
GitHubで表示
ドキュメントgeneral

について

このスキルは、KafkaまたはRedisバックエンドを使用して、イベント駆動型マイクロサービスのためのDapr pub/subコンポーネントを設定します。開発者がエージェント間通信を接続し、イベントサブスクリプションをセットアップし、適切なコンポーネント設定とKubernetesデプロイメントでDaprサイドカーを統合するのに役立ちます。Daprベースのメッセージングパターンを実装する際に使用しますが、直接的なKafkaクライアントや非Daprメッセージングアプローチには使用しません。

クイックインストール

Claude Code

推奨
プラグインコマンド推奨
/plugin add https://github.com/majiayu000/claude-skill-registry
Git クローン代替
git clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/configuring-dapr-pubsub

このコマンドをClaude Codeにコピー&ペーストしてスキルをインストールします

ドキュメント

Configuring Dapr Pub/Sub

Wire event-driven microservices using Dapr pub/sub with Kafka or Redis backends.

Quick Start

# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"
    - name: disableTls
      value: "true"
# Apply component
kubectl apply -f components/pubsub.yaml

# Test with Dapr CLI
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'

Component Configurations

Kafka (Production)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    # Required
    - name: brokers
      value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
    - name: authType
      value: "none"

    # Consumer settings
    - name: consumerGroup
      value: "{namespace}-{appId}"  # Templated per deployment
    - name: consumeRetryInterval
      value: "100ms"
    - name: heartbeatInterval
      value: "3s"
    - name: sessionTimeout
      value: "10s"

    # Performance
    - name: maxMessageBytes
      value: "1048576"  # 1MB
    - name: channelBufferSize
      value: "256"

Kafka with SASL Authentication

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: kafka-pubsub-secure
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka.example.com:9093"
    - name: authType
      value: "password"
    - name: saslUsername
      value: "dapr-user"
    - name: saslPassword
      secretKeyRef:
        name: kafka-secrets
        key: password
    - name: saslMechanism
      value: "SCRAM-SHA-256"

Redis (Development/Simple)

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: redis-pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "redis-master.redis.svc.cluster.local:6379"
    - name: redisPassword
      secretKeyRef:
        name: redis-secrets
        key: password

Subscription Patterns

Declarative Subscription (Recommended)

# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: task-created-subscription
spec:
  pubsubname: pubsub
  topic: task-created
  routes:
    default: /dapr/task-created
  scopes:
    - triage-agent
    - concepts-agent

Programmatic Subscription (FastAPI)

from fastapi import FastAPI, Request

app = FastAPI()

@app.get("/dapr/subscribe")
async def subscribe():
    """Dapr calls this to discover subscriptions."""
    return [
        {
            "pubsubname": "pubsub",
            "topic": "task-created",
            "route": "/dapr/task-created"
        },
        {
            "pubsubname": "pubsub",
            "topic": "task-completed",
            "route": "/dapr/task-completed"
        }
    ]

@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
    """Handle incoming CloudEvent."""
    event = await request.json()

    # CloudEvent wrapper - data is nested
    task_data = event.get("data", event)
    task_id = task_data.get("task_id")

    # Process event
    print(f"Task created: {task_id}")

    return {"status": "SUCCESS"}

Publishing Events

From FastAPI Service

import httpx

DAPR_URL = "http://localhost:3500"

async def publish_event(topic: str, data: dict):
    """Publish event through Dapr sidecar."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={"Content-Type": "application/json"}
        )
        response.raise_for_status()

# Usage
await publish_event("task-created", {
    "task_id": "123",
    "title": "Learn Python",
    "user_id": "user-456"
})

With CloudEvent Metadata

async def publish_cloudevent(topic: str, data: dict, event_type: str):
    """Publish with explicit CloudEvent fields."""
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json=data,
            headers={
                "Content-Type": "application/cloudevents+json",
                "ce-specversion": "1.0",
                "ce-type": event_type,
                "ce-source": "triage-agent",
                "ce-id": str(uuid.uuid4())
            }
        )

Kubernetes Deployment

Component Scoping

Limit component access to specific apps:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
    - name: brokers
      value: "kafka:9092"
scopes:
  - triage-agent
  - concepts-agent
  - debug-agent

App Deployment with Dapr Sidecar

apiVersion: apps/v1
kind: Deployment
metadata:
  name: triage-agent
spec:
  replicas: 2
  selector:
    matchLabels:
      app: triage-agent
  template:
    metadata:
      labels:
        app: triage-agent
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "triage-agent"
        dapr.io/app-port: "8000"
        dapr.io/enable-api-logging: "true"
    spec:
      containers:
        - name: triage-agent
          image: myapp/triage-agent:latest
          ports:
            - containerPort: 8000
          env:
            - name: DAPR_HTTP_PORT
              value: "3500"

Multi-Agent Routing Pattern

Triage Agent → Specialist Agents

# triage_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.post("/api/question")
async def handle_question(request: Request):
    data = await request.json()
    question = data["question"]

    # Route based on content
    if "python" in question.lower() or "code" in question.lower():
        topic = "concepts-request"
    elif "error" in question.lower() or "bug" in question.lower():
        topic = "debug-request"
    else:
        topic = "concepts-request"  # Default

    # Publish to appropriate agent
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
            json={
                "question": question,
                "user_id": data["user_id"],
                "session_id": data["session_id"]
            }
        )

    return {"status": "routed", "topic": topic}

Specialist Agent Handler

# concepts_agent.py
from fastapi import FastAPI, Request
import httpx

app = FastAPI()
DAPR_URL = "http://localhost:3500"

@app.get("/dapr/subscribe")
async def subscribe():
    return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]

@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
    event = await request.json()
    data = event.get("data", event)

    # Process with LLM
    response = await process_with_llm(data["question"])

    # Publish response
    async with httpx.AsyncClient() as client:
        await client.post(
            f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
            json={
                "session_id": data["session_id"],
                "response": response,
                "agent": "concepts"
            }
        )

    return {"status": "SUCCESS"}

Local Development

Run with Dapr CLI

# Start subscriber first
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
  --resources-path ./components -- uvicorn concepts:app --port 8001

# Start publisher
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
  --resources-path ./components -- uvicorn triage:app --port 8000

Docker Compose with Dapr

version: "3.8"
services:
  triage-agent:
    build: ./services/triage
    ports:
      - "8000:8000"

  triage-agent-dapr:
    image: daprio/daprd:latest
    command: ["./daprd",
      "--app-id", "triage-agent",
      "--app-port", "8000",
      "--dapr-http-port", "3500",
      "--resources-path", "/components"
    ]
    volumes:
      - ./components:/components
    network_mode: "service:triage-agent"
    depends_on:
      - triage-agent

  kafka:
    image: confluentinc/cp-kafka:latest
    # ... kafka config

Troubleshooting

Check Dapr Sidecar

# View sidecar logs
kubectl logs deploy/triage-agent -c daprd

# Check component registration
curl http://localhost:3500/v1.0/metadata

Common Issues

ErrorCauseFix
component not foundComponent not loadedCheck --resources-path or K8s namespace
connection refusedKafka not reachableVerify broker address in component
consumer group rebalanceMultiple instancesUse unique consumerGroup per app
event not receivedWrong topic/routeCheck subscription config

Debug Event Flow

# Publish test event
dapr publish --pubsub pubsub --topic test --data '{"test": true}'

# Check consumer logs
kubectl logs deploy/my-app -c daprd | grep -i subscribe

Verification

Run: python scripts/verify.py

Related Skills

  • deploying-kafka-k8s - Kafka cluster setup with Strimzi
  • scaffolding-fastapi-dapr - FastAPI services with Dapr
  • scaffolding-openai-agents - Agent orchestration patterns

GitHub リポジトリ

majiayu000/claude-skill-registry
パス: skills/configuring-dapr-pubsub

関連スキル

algorithmic-art

メタ

This Claude Skill creates original algorithmic art using p5.js with seeded randomness and interactive parameters. It generates .md files for algorithmic philosophies, plus .html and .js files for interactive generative art implementations. Use it when developers need to create flow fields, particle systems, or other computational art while avoiding copyright issues.

スキルを見る

subagent-driven-development

開発

This skill executes implementation plans by dispatching a fresh subagent for each independent task, with code review between tasks. It enables fast iteration while maintaining quality gates through this review process. Use it when working on mostly independent tasks within the same session to ensure continuous progress with built-in quality checks.

スキルを見る

executing-plans

デザイン

Use the executing-plans skill when you have a complete implementation plan to execute in controlled batches with review checkpoints. It loads and critically reviews the plan, then executes tasks in small batches (default 3 tasks) while reporting progress between each batch for architect review. This ensures systematic implementation with built-in quality control checkpoints.

スキルを見る

cost-optimization

その他

This Claude Skill helps developers optimize cloud costs through resource rightsizing, tagging strategies, and spending analysis. It provides a framework for reducing cloud expenses and implementing cost governance across AWS, Azure, and GCP. Use it when you need to analyze infrastructure costs, right-size resources, or meet budget constraints.

スキルを見る