.claude/skills/configuring-dapr-pubsub/SKILL.md
Configures Dapr pub/sub components for event-driven microservices with Kafka or Redis. Use when wiring agent-to-agent communication, setting up event subscriptions, or integrating Dapr sidecars. Covers component configuration, subscription patterns, publishing events, and Kubernetes deployment. NOT when using direct Kafka clients or non-Dapr messaging patterns.
npx skillsauth add Asmayaseen/hackathon-2 configuring-dapr-pubsubInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Wire event-driven microservices using Dapr pub/sub with Kafka or Redis backends.
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
- name: disableTls
value: "true"
# Apply component
kubectl apply -f components/pubsub.yaml
# Test with Dapr CLI
dapr run --app-id publisher -- dapr publish --pubsub pubsub --topic test --data '{"msg":"hello"}'
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
# Required
- name: brokers
value: "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
- name: authType
value: "none"
# Consumer settings
- name: consumerGroup
value: "{namespace}-{appId}" # Templated per deployment
- name: consumeRetryInterval
value: "100ms"
- name: heartbeatInterval
value: "3s"
- name: sessionTimeout
value: "10s"
# Performance
- name: maxMessageBytes
value: "1048576" # 1MB
- name: channelBufferSize
value: "256"
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-secure
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka.example.com:9093"
- name: authType
value: "password"
- name: saslUsername
value: "dapr-user"
- name: saslPassword
secretKeyRef:
name: kafka-secrets
key: password
- name: saslMechanism
value: "SCRAM-SHA-256"
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: redis-pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "redis-master.redis.svc.cluster.local:6379"
- name: redisPassword
secretKeyRef:
name: redis-secrets
key: password
# subscriptions/task-events.yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: task-created-subscription
spec:
pubsubname: pubsub
topic: task-created
routes:
default: /dapr/task-created
scopes:
- triage-agent
- concepts-agent
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/dapr/subscribe")
async def subscribe():
"""Dapr calls this to discover subscriptions."""
return [
{
"pubsubname": "pubsub",
"topic": "task-created",
"route": "/dapr/task-created"
},
{
"pubsubname": "pubsub",
"topic": "task-completed",
"route": "/dapr/task-completed"
}
]
@app.post("/dapr/task-created")
async def handle_task_created(request: Request):
"""Handle incoming CloudEvent."""
event = await request.json()
# CloudEvent wrapper - data is nested
task_data = event.get("data", event)
task_id = task_data.get("task_id")
# Process event
print(f"Task created: {task_id}")
return {"status": "SUCCESS"}
import httpx
DAPR_URL = "http://localhost:3500"
async def publish_event(topic: str, data: dict):
"""Publish event through Dapr sidecar."""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
# Usage
await publish_event("task-created", {
"task_id": "123",
"title": "Learn Python",
"user_id": "user-456"
})
async def publish_cloudevent(topic: str, data: dict, event_type: str):
"""Publish with explicit CloudEvent fields."""
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json=data,
headers={
"Content-Type": "application/cloudevents+json",
"ce-specversion": "1.0",
"ce-type": event_type,
"ce-source": "triage-agent",
"ce-id": str(uuid.uuid4())
}
)
Limit component access to specific apps:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: "kafka:9092"
scopes:
- triage-agent
- concepts-agent
- debug-agent
apiVersion: apps/v1
kind: Deployment
metadata:
name: triage-agent
spec:
replicas: 2
selector:
matchLabels:
app: triage-agent
template:
metadata:
labels:
app: triage-agent
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "triage-agent"
dapr.io/app-port: "8000"
dapr.io/enable-api-logging: "true"
spec:
containers:
- name: triage-agent
image: myapp/triage-agent:latest
ports:
- containerPort: 8000
env:
- name: DAPR_HTTP_PORT
value: "3500"
# triage_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.post("/api/question")
async def handle_question(request: Request):
data = await request.json()
question = data["question"]
# Route based on content
if "python" in question.lower() or "code" in question.lower():
topic = "concepts-request"
elif "error" in question.lower() or "bug" in question.lower():
topic = "debug-request"
else:
topic = "concepts-request" # Default
# Publish to appropriate agent
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/{topic}",
json={
"question": question,
"user_id": data["user_id"],
"session_id": data["session_id"]
}
)
return {"status": "routed", "topic": topic}
# concepts_agent.py
from fastapi import FastAPI, Request
import httpx
app = FastAPI()
DAPR_URL = "http://localhost:3500"
@app.get("/dapr/subscribe")
async def subscribe():
return [{"pubsubname": "pubsub", "topic": "concepts-request", "route": "/dapr/handle"}]
@app.post("/dapr/handle")
async def handle_concepts_request(request: Request):
event = await request.json()
data = event.get("data", event)
# Process with LLM
response = await process_with_llm(data["question"])
# Publish response
async with httpx.AsyncClient() as client:
await client.post(
f"{DAPR_URL}/v1.0/publish/pubsub/response-ready",
json={
"session_id": data["session_id"],
"response": response,
"agent": "concepts"
}
)
return {"status": "SUCCESS"}
# Start subscriber first
dapr run --app-id concepts-agent --app-port 8001 --dapr-http-port 3501 \
--resources-path ./components -- uvicorn concepts:app --port 8001
# Start publisher
dapr run --app-id triage-agent --app-port 8000 --dapr-http-port 3500 \
--resources-path ./components -- uvicorn triage:app --port 8000
version: "3.8"
services:
triage-agent:
build: ./services/triage
ports:
- "8000:8000"
triage-agent-dapr:
image: daprio/daprd:latest
command: ["./daprd",
"--app-id", "triage-agent",
"--app-port", "8000",
"--dapr-http-port", "3500",
"--resources-path", "/components"
]
volumes:
- ./components:/components
network_mode: "service:triage-agent"
depends_on:
- triage-agent
kafka:
image: confluentinc/cp-kafka:latest
# ... kafka config
# View sidecar logs
kubectl logs deploy/triage-agent -c daprd
# Check component registration
curl http://localhost:3500/v1.0/metadata
| Error | Cause | Fix |
|-------|-------|-----|
| component not found | Component not loaded | Check --resources-path or K8s namespace |
| connection refused | Kafka not reachable | Verify broker address in component |
| consumer group rebalance | Multiple instances | Use unique consumerGroup per app |
| event not received | Wrong topic/route | Check subscription config |
# Publish test event
dapr publish --pubsub pubsub --topic test --data '{"test": true}'
# Check consumer logs
kubectl logs deploy/my-app -c daprd | grep -i subscribe
Run: python scripts/verify.py
deploying-kafka-k8s - Kafka cluster setup with Strimziscaffolding-fastapi-dapr - FastAPI services with Daprscaffolding-openai-agents - Agent orchestration patternsdevelopment
Systematic methodology for debugging bugs, test failures, and unexpected behavior. Use when encountering any technical issue before proposing fixes. Covers root cause investigation, pattern analysis, hypothesis testing, and fix implementation. Use ESPECIALLY when under time pressure, "just one quick fix" seems obvious, or you've already tried multiple fixes. NOT for exploratory code reading.
development
Build beautiful, accessible UIs with shadcn/ui components in Next.js. Use when creating forms, dialogs, tables, sidebars, or any UI components. Covers installation, component patterns, react-hook-form + Zod validation, and dark mode setup. NOT when building non-React applications or using different component libraries.
tools
Implement real-time streaming UI patterns for AI chat applications. Use when adding response lifecycle handlers, progress indicators, client effects, or thread state synchronization. Covers onResponseStart/End, onEffect, ProgressUpdateEvent, and client tools. NOT when building basic chat without real-time feedback.
tools
Builds AI agents using OpenAI Agents SDK with async/await patterns and multi-agent orchestration. Use when creating tutoring agents, building agent handoffs, implementing tool-calling agents, or orchestrating multiple specialists. Covers Agent class, Runner patterns, function tools, guardrails, and streaming responses. NOT when using raw OpenAI API without SDK or other agent frameworks like LangChain.