configuring-dapr-pubsub
About
This skill configures Dapr pub/sub components for event-driven microservices using Kafka or Redis backends. It helps developers wire agent-to-agent communication, set up event subscriptions, and integrate Dapr sidecars with proper component configuration and Kubernetes deployment. Use it when implementing Dapr-based messaging patterns, but not for direct Kafka clients or non-Dapr messaging approaches.
Quick Install
Claude Code
Recommended/plugin add https://github.com/majiayu000/claude-skill-registrygit clone https://github.com/majiayu000/claude-skill-registry.git ~/.claude/skills/configuring-dapr-pubsubCopy and paste this command in Claude Code to install this skill
Documentation
Configuring Dapr Pub/Sub
Wire event-driven microservices using Dapr pub/sub with Kafka or Redis backends.
Quick Start
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
- name: disableTls
value: "true"
# Apply component
kubectl apply -f components/pubsub.yaml
# Test with Dapr CLI
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'
Component Configurations
Kafka (Production)
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
# Required
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
# Consumer settings
- name: consumerGroup
value: "{namespace}-{appId}" # Templated per deployment
- name: consumeRetryInterval
value: "100ms"
- name: heartbeatInterval
value: "3s"
- name: sessionTimeout
value: "10s"
# Performance
- name: maxMessageBytes
value: "1048576" # 1MB
- name: channelBufferSize
value: "256"
Kafka with SASL Authentication
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-secure
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka.example.com:9093"
- name: authType
value: "password"
- name: saslUsername
value: "dapr-user"
- name: saslPassword
secretKeyRef:
name: kafka-secrets
key: password
- name: saslMechanism
value: "SCRAM-SHA-256"
Redis (Development/Simple)
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "redis-master.redis.svc.cluster.local:6379"
- name: redisPassword
secretKeyRef:
name: redis-secrets
key: password
Subscription Patterns
Declarative Subscription (Recommended)
# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: task-created-subscription
spec:
pubsubname: pubsub
topic: task-created
routes:
default: /dapr/task-created
scopes:
- triage-agent
- concepts-agent
Programmatic Subscription (FastAPI)
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/dapr/subscribe")
async def subscribe():
"""Dapr calls this to discover subscriptions."""
return [
{
"pubsubname": "pubsub",
"topic": "task-created",
"route": "/dapr/task-created"
},
{
"pubsubname": "pubsub",
"topic": "task-completed",
"route": "/dapr/task-completed"
}
]
@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
"""Handle incoming CloudEvent."""
event = await request.json()
# CloudEvent wrapper - data is nested
task_data = event.get("data", event)
task_id = task_data.get("task_id")
# Process event
print(f"Task created: {task_id}")
return {"status": "SUCCESS"}
Publishing Events
From FastAPI Service
import httpx
DAPR_URL = "http://localhost:3500"
async def publish_event(topic: str, data: dict):
"""Publish event through Dapr sidecar."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
# Usage
await publish_event("task-created", {
"task_id": "123",
"title": "Learn Python",
"user_id": "user-456"
})
With CloudEvent Metadata
async def publish_cloudevent(topic: str, data: dict, event_type: str):
"""Publish with explicit CloudEvent fields."""
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={
"Content-Type": "application/cloudevents+json",
"ce-specversion": "1.0",
"ce-type": event_type,
"ce-source": "triage-agent",
"ce-id": str(uuid.uuid4())
}
)
Kubernetes Deployment
Component Scoping
Limit component access to specific apps:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka:9092"
scopes:
- triage-agent
- concepts-agent
- debug-agent
App Deployment with Dapr Sidecar
apiVersion: apps/v1
kind: Deployment
metadata:
name: triage-agent
spec:
replicas: 2
selector:
matchLabels:
app: triage-agent
template:
metadata:
labels:
app: triage-agent
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "triage-agent"
dapr.io/app-port: "8000"
dapr.io/enable-api-logging: "true"
spec:
containers:
- name: triage-agent
image: myapp/triage-agent:latest
ports:
- containerPort: 8000
env:
- name: DAPR_HTTP_PORT
value: "3500"
Multi-Agent Routing Pattern
Triage Agent → Specialist Agents
# triage_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.post("/api/question")
async def handle_question(request: Request):
data = await request.json()
question = data["question"]
# Route based on content
if "python" in question.lower() or "code" in question.lower():
topic = "concepts-request"
elif "error" in question.lower() or "bug" in question.lower():
topic = "debug-request"
else:
topic = "concepts-request" # Default
# Publish to appropriate agent
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json={
"question": question,
"user_id": data["user_id"],
"session_id": data["session_id"]
}
)
return {"status": "routed", "topic": topic}
Specialist Agent Handler
# concepts_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.get("/dapr/subscribe")
async def subscribe():
return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]
@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
event = await request.json()
data = event.get("data", event)
# Process with LLM
response = await process_with_llm(data["question"])
# Publish response
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
json={
"session_id": data["session_id"],
"response": response,
"agent": "concepts"
}
)
return {"status": "SUCCESS"}
Local Development
Run with Dapr CLI
# Start subscriber first
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
--resources-path ./components -- uvicorn concepts:app --port 8001
# Start publisher
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
--resources-path ./components -- uvicorn triage:app --port 8000
Docker Compose with Dapr
version: "3.8"
services:
triage-agent:
build: ./services/triage
ports:
- "8000:8000"
triage-agent-dapr:
image: daprio/daprd:latest
command: ["./daprd",
"--app-id", "triage-agent",
"--app-port", "8000",
"--dapr-http-port", "3500",
"--resources-path", "/components"
]
volumes:
- ./components:/components
network_mode: "service:triage-agent"
depends_on:
- triage-agent
kafka:
image: confluentinc/cp-kafka:latest
# ... kafka config
Troubleshooting
Check Dapr Sidecar
# View sidecar logs
kubectl logs deploy/triage-agent -c daprd
# Check component registration
curl http://localhost:3500/v1.0/metadata
Common Issues
| Error | Cause | Fix |
|---|---|---|
component not found | Component not loaded | Check --resources-path or K8s namespace |
connection refused | Kafka not reachable | Verify broker address in component |
consumer group rebalance | Multiple instances | Use unique consumerGroup per app |
event not received | Wrong topic/route | Check subscription config |
Debug Event Flow
# Publish test event
dapr publish --pubsub pubsub --topic test --data '{"test": true}'
# Check consumer logs
kubectl logs deploy/my-app -c daprd | grep -i subscribe
Verification
Run: python scripts/verify.py
Related Skills
deploying-kafka-k8s- Kafka cluster setup with Strimziscaffolding-fastapi-dapr- FastAPI services with Daprscaffolding-openai-agents- Agent orchestration patterns
GitHub Repository
Related Skills
algorithmic-art
MetaThis Claude Skill creates original algorithmic art using p5.js with seeded randomness and interactive parameters. It generates .md files for algorithmic philosophies, plus .html and .js files for interactive generative art implementations. Use it when developers need to create flow fields, particle systems, or other computational art while avoiding copyright issues.
subagent-driven-development
DevelopmentThis skill executes implementation plans by dispatching a fresh subagent for each independent task, with code review between tasks. It enables fast iteration while maintaining quality gates through this review process. Use it when working on mostly independent tasks within the same session to ensure continuous progress with built-in quality checks.
executing-plans
DesignUse the executing-plans skill when you have a complete implementation plan to execute in controlled batches with review checkpoints. It loads and critically reviews the plan, then executes tasks in small batches (default 3 tasks) while reporting progress between each batch for architect review. This ensures systematic implementation with built-in quality control checkpoints.
cost-optimization
OtherThis Claude Skill helps developers optimize cloud costs through resource rightsizing, tagging strategies, and spending analysis. It provides a framework for reducing cloud expenses and implementing cost governance across AWS, Azure, and GCP. Use it when you need to analyze infrastructure costs, right-size resources, or meet budget constraints.
