skills/kafka-patterns/SKILL.md
Topic design, partition strategies, consumer group patterns, exactly-once processing, and dead letter queue handling.
npx skillsauth add rubicanjr/FinCognis kafka-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Event streaming patterns for Apache Kafka in distributed systems.
# Topic naming convention: <domain>.<entity>.<event-type>
# Examples:
# orders.order.created
# payments.payment.completed
# inventory.stock.updated
# Topic configuration
topics:
orders.order.created:
partitions: 12 # Match expected consumer parallelism
replication-factor: 3 # Survive 2 broker failures
retention.ms: 604800000 # 7 days
cleanup.policy: delete
orders.order.changelog:
partitions: 12
replication-factor: 3
retention.ms: -1 # Infinite retention (compacted)
cleanup.policy: compact # Keep latest value per key
min.compaction.lag.ms: 3600000 # 1h before compacting
import { Kafka, Partitioners, CompressionTypes } from 'kafkajs'
const kafka = new Kafka({
clientId: 'order-service',
brokers: process.env.KAFKA_BROKERS!.split(','),
})
const producer = kafka.producer({
idempotent: true, // Exactly-once producer
maxInFlightRequests: 5, // Max parallel requests
createPartitioner: Partitioners.DefaultPartitioner,
})
await producer.connect()
// Key-based partitioning: same key always goes to same partition (ordering)
async function publishOrderEvent(order: Order, eventType: string): Promise<void> {
await producer.send({
topic: `orders.order.${eventType}`,
compression: CompressionTypes.LZ4,
messages: [{
key: order.id, // Orders for same ID → same partition → ordered
value: JSON.stringify({
eventId: crypto.randomUUID(), // Idempotency key
eventType,
timestamp: new Date().toISOString(),
data: order,
}),
headers: {
'content-type': 'application/json',
'source': 'order-service',
'correlation-id': order.correlationId,
},
}],
})
}
// Batch publishing for throughput
async function publishBatch(events: OrderEvent[]): Promise<void> {
await producer.sendBatch({
topicMessages: [{
topic: 'orders.order.created',
messages: events.map(e => ({
key: e.orderId,
value: JSON.stringify(e),
})),
}],
})
}
const consumer = kafka.consumer({
groupId: 'payment-processor', // Consumer group: shared topic consumption
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576, // 1MB per partition per fetch
retry: { retries: 5 },
})
await consumer.connect()
await consumer.subscribe({
topics: ['orders.order.created'],
fromBeginning: false, // Start from latest offset
})
await consumer.run({
autoCommit: false, // Manual commit for exactly-once
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, heartbeat }) => {
for (const message of batch.messages) {
try {
const event = JSON.parse(message.value!.toString())
// Idempotency check: skip already processed events
if (await isAlreadyProcessed(event.eventId)) {
resolveOffset(message.offset)
continue
}
await processOrderPayment(event.data)
await markAsProcessed(event.eventId)
resolveOffset(message.offset)
await commitOffsetsIfNecessary()
await heartbeat()
} catch (err) {
console.error(`Failed to process message at offset ${message.offset}:`, err)
// Send to DLQ instead of blocking the partition
await sendToDeadLetterQueue(message, err as Error)
resolveOffset(message.offset)
}
}
},
})
const DLQ_TOPIC = 'orders.order.created.dlq'
async function sendToDeadLetterQueue(
originalMessage: KafkaMessage,
error: Error
): Promise<void> {
await producer.send({
topic: DLQ_TOPIC,
messages: [{
key: originalMessage.key,
value: originalMessage.value,
headers: {
...originalMessage.headers,
'dlq-reason': error.message,
'dlq-timestamp': new Date().toISOString(),
'dlq-original-topic': 'orders.order.created',
'dlq-retry-count': '0',
},
}],
})
}
// DLQ consumer: retry or alert
async function processDLQ(): Promise<void> {
const dlqConsumer = kafka.consumer({ groupId: 'dlq-processor' })
await dlqConsumer.subscribe({ topics: [DLQ_TOPIC] })
await dlqConsumer.run({
eachMessage: async ({ message }) => {
const retryCount = parseInt(
message.headers?.['dlq-retry-count']?.toString() ?? '0'
)
if (retryCount >= 3) {
// Max retries exceeded: alert ops team
await alertOps({
topic: DLQ_TOPIC,
key: message.key?.toString(),
reason: message.headers?.['dlq-reason']?.toString(),
retries: retryCount,
})
return
}
// Retry with incremented count
try {
const event = JSON.parse(message.value!.toString())
await processOrderPayment(event.data)
} catch (err) {
// Re-enqueue with incremented retry count
await producer.send({
topic: DLQ_TOPIC,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'dlq-retry-count': String(retryCount + 1),
},
}],
})
}
},
})
}
// Custom partitioner: route by region for data locality
const regionalPartitioner = () => ({
partition: ({ topic, partitionMetadata, message }) => {
const region = message.headers?.['region']?.toString() ?? 'default'
const regionMap: Record<string, number> = {
'us-east': 0, 'us-west': 1,
'eu-west': 2, 'eu-east': 3,
'ap-southeast': 4,
}
const partition = regionMap[region]
if (partition !== undefined && partition < partitionMetadata.length) {
return partition
}
// Fallback: hash the key
const numPartitions = partitionMetadata.length
const hash = murmurHash(message.key?.toString() ?? '')
return Math.abs(hash) % numPartitions
}
})
development
Goal-based workflow orchestration - routes tasks to specialist agents based on user goals
tools
Wiring Verification
development
Connection management, room patterns, reconnection strategies, message buffering, and binary protocol design.
development
Screenshot comparison QA for frontend development. Takes a screenshot of the current implementation, scores it across multiple visual dimensions, and returns a structured PASS/REVISE/FAIL verdict with concrete fixes. Use when implementing UI from a design reference or verifying visual correctness.