skills/message_queues/SKILL.md
Async communication patterns using message brokers and task queues. Use when building event-driven systems, background job processing, or service decoupling. Covers Kafka (event streaming), RabbitMQ (complex routing), NATS (cloud-native), Redis Streams, Celery (Python), BullMQ (TypeScript), Temporal (workflows), and event sourcing patterns.
npx skillsauth add vuralserhat86/antigravity-agentic-skills message_queuesInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
Use message queues when:
Choose message broker based on primary need:
→ Apache Kafka
→ Task Queues
→ Temporal
→ NATS
→ RabbitMQ
→ Redis Streams
| Broker | Throughput | Latency (p99) | Best For | |--------|-----------|---------------|----------| | Kafka | 500K-1M msg/s | 10-50ms | Event streaming | | NATS JetStream | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices | | RabbitMQ | 50K-100K msg/s | 5-20ms | Task queues, complex routing | | Redis Streams | 100K+ msg/s | Sub-ms | Simple queues, caching |
See examples/kafka-python/ for working code.
from confluent_kafka import Producer, Consumer
# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()
# Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())
See examples/celery-image-processing/ for full implementation.
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)
See examples/bullmq-webhook-processor/ for full implementation.
import { Queue, Worker } from 'bullmq'
const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})
// Enqueue job
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})
// Process jobs
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })
See examples/temporal-order-saga/ for saga pattern implementation.
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Step 1: Reserve inventory
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)
# Step 2: Charge payment
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Order {order_id} completed"
Use: Domain.Entity.Action.Version
Examples:
order.created.v1user.profile.updated.v2payment.failed.v1{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}
Route failed messages to dead letter queue (DLQ) after max retries:
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)
@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# Check if already processed
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}
result = process_payment_logic(payment_data)
redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
return {"status": "processed", "result": result}
# FastAPI endpoint for real-time job status
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
async def event_generator():
while True:
task = celery_app.AsyncResult(task_id)
if task.state == 'PROGRESS':
yield {"event": "progress", "data": task.info.get('progress', 0)}
elif task.state == 'SUCCESS':
yield {"event": "complete", "data": task.result}
break
await asyncio.sleep(0.5)
return EventSourceResponse(event_generator())
export function JobStatus({ jobId }: { jobId: string }) {
const [progress, setProgress] = useState(0)
useEffect(() => {
const eventSource = new EventSource(`/api/status/${jobId}`)
eventSource.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data))
})
eventSource.addEventListener('complete', (e) => {
toast({ title: 'Job complete', description: JSON.parse(e.data) })
eventSource.close()
})
return () => eventSource.close()
}, [jobId])
return <ProgressBar value={progress} />
}
For comprehensive documentation, see reference files:
references/kafka.md for partitioning, consumer groups, exactly-once semanticsreferences/rabbitmq.md for exchanges, bindings, routing patternsreferences/nats.md for JetStream, request-reply patternsreferences/redis-streams.md for consumer groups, acknowledgmentsreferences/celery.md for periodic tasks, canvas (workflows), monitoringreferences/bullmq.md for job prioritization, flows, Bull Board monitoringreferences/temporal-workflows.md for saga patterns, signals, queriesreferences/event-patterns.md for event sourcing, CQRS, outbox pattern# ❌ BAD: Blocks request thread
@app.post("/generate-report")
def generate_report(user_id: str):
report = expensive_computation(user_id) # 5 minutes!
return report
# ✅ GOOD: Enqueue background job
@app.post("/generate-report")
async def generate_report(user_id: str):
task = generate_report_task.delay(user_id)
return {"task_id": task.id}
# ❌ BAD: Processes duplicates
@app.task
def send_email(email: str):
send_email_service(email) # Sends twice if retried!
# ✅ GOOD: Idempotent with deduplication
@app.task
def send_email(email: str, idempotency_key: str):
if redis.exists(f"sent:{idempotency_key}"):
return "already_sent"
send_email_service(email)
redis.setex(f"sent:{idempotency_key}", 86400, "1")
# ❌ BAD: Failed messages lost forever
@app.task(max_retries=3)
def risky_task(data):
process(data) # If all retries fail, data disappears
# ✅ GOOD: DLQ for manual inspection
@app.task(max_retries=3)
def risky_task(data):
try:
process(data)
except Exception as e:
if self.request.retries >= 3:
send_to_dlq(data, str(e))
raise
# ❌ BAD: Kafka is not designed for RPC
def get_user_profile(user_id: str):
kafka_producer.send("user_requests", {"user_id": user_id})
# How to correlate response? Kafka is asynchronous!
# ✅ GOOD: Use NATS request-reply or HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())
Confluent Kafka (Python)
/confluentinc/confluent-kafka-pythonTemporal
/websites/temporal_ioPython:
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
cargo add rdkafka lapin async-nats redis
Go:
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk
Use scripts for setup automation:
python scripts/kafka_producer_consumer.py for test utilitiespython scripts/validate_message_schema.py to validate event schemasKaynak: Enterprise Integration Patterns & Confluent Kafka Guide
| Aşama | Doğrulama | |-------|-----------| | 1 | Mesaj sırasında (ordering) bozulma iş mantığını bozuyor mu? | | 2 | Sistem 24 saatlik log kaybına dayanıklı mı (Durability)? | | 3 | Poison message (formatı bozuk mesaj) sistemi kilitliyor mu? |
tools
Production-tested setup for Zustand state management in React. Includes patterns for persistence, devtools, and TypeScript patterns. Prevents hydration mismatches and render loops.
development
Comprehensive spreadsheet creation, editing, and analysis with support for formulas, formatting, data analysis, and visualization. When Claude needs to work with spreadsheets (.xlsx, .xlsm, .csv, .tsv, etc) for: (1) Creating new spreadsheets with formulas and formatting, (2) Reading or analyzing data, (3) Modify existing spreadsheets while preserving formulas, (4) Data analysis and visualization in spreadsheets, or (5) Recalculating formulas
development
--- name: websocket_engineer router_kit: FullStackKit description: WebSocket specialist for real-time communication systems. Invoke for Socket.IO, WebSocket servers, bidirectional messaging, presence systems. Keywords: WebSocket, Socket.IO, real-time, pub/sub, Redis. triggers: - WebSocket - Socket.IO - real-time communication - bidirectional messaging - pub/sub - server push - live updates - chat systems - presence tracking role: specialist scope: implementation output-format:
tools
Toolkit for interacting with and testing local web applications using Playwright. Supports verifying frontend functionality, debugging UI behavior, capturing browser screenshots, and viewing browser logs.