skills/azure-eventhub-ts/SKILL.md
Build event streaming applications using Azure Event Hubs SDK for JavaScript (@azure/event-hubs). Use when implementing high-throughput event ingestion, real-time analytics, IoT telemetry, or event...
npx skillsauth add voidomin/Param_Adventures_Phase2 azure-eventhub-tsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
High-throughput event streaming and real-time data ingestion.
npm install @azure/event-hubs @azure/identity
For checkpointing with consumer groups:
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpoints
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
const credential = new DefaultAzureCredential();
// Producer
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
// Consumer
const consumer = new EventHubConsumerClient(
"$Default", // Consumer group
fullyQualifiedNamespace,
eventHubName,
credential
);
const producer = new EventHubProducerClient(namespace, eventHubName, credential);
// Create batch and add events
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });
await producer.sendBatch(batch);
await producer.close();
// By partition ID
const batch = await producer.createBatch({ partitionId: "0" });
// By partition key (consistent hashing)
const batch = await producer.createBatch({ partitionKey: "device-123" });
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
}
},
processError: async (err, context) => {
console.error(`Error on partition ${context.partitionId}: ${err.message}`);
},
});
// Stop after some time
setTimeout(async () => {
await subscription.close();
await consumer.close();
}, 60000);
import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const containerClient = new ContainerClient(
`https://${storageAccount}.blob.core.windows.net/${containerName}`,
credential
);
const checkpointStore = new BlobCheckpointStore(containerClient);
const consumer = new EventHubConsumerClient(
"$Default",
namespace,
eventHubName,
credential,
checkpointStore
);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Processing: ${JSON.stringify(event.body)}`);
}
// Checkpoint after processing batch
if (events.length > 0) {
await context.updateCheckpoint(events[events.length - 1]);
}
},
processError: async (err, context) => {
console.error(`Error: ${err.message}`);
},
});
const subscription = consumer.subscribe({
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
}, {
startPosition: {
// Start from beginning
"0": { offset: "@earliest" },
// Start from end (new events only)
"1": { offset: "@latest" },
// Start from specific offset
"2": { offset: "12345" },
// Start from specific time
"3": { enqueuedOn: new Date("2024-01-01") },
},
});
// Get hub info
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);
// Get partition info
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);
const subscription = consumer.subscribe(
{
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
},
{
maxBatchSize: 100, // Max events per batch
maxWaitTimeInSeconds: 30, // Max wait for batch
}
);
import {
EventHubProducerClient,
EventHubConsumerClient,
EventData,
ReceivedEventData,
PartitionContext,
Subscription,
SubscriptionEventHandlers,
CreateBatchOptions,
EventPosition,
} from "@azure/event-hubs";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
// Send with properties
const batch = await producer.createBatch();
batch.tryAdd({
body: { data: "payload" },
properties: {
eventType: "telemetry",
deviceId: "sensor-1",
},
contentType: "application/json",
correlationId: "request-123",
});
// Access in receiver
consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Type: ${event.properties?.eventType}`);
console.log(`Sequence: ${event.sequenceNumber}`);
console.log(`Enqueued: ${event.enqueuedTimeUtc}`);
console.log(`Offset: ${event.offset}`);
}
},
});
consumer.subscribe({
processEvents: async (events, context) => {
try {
for (const event of events) {
await processEvent(event);
}
await context.updateCheckpoint(events[events.length - 1]);
} catch (error) {
// Don't checkpoint on error - events will be reprocessed
console.error("Processing failed:", error);
}
},
processError: async (err, context) => {
if (err.name === "MessagingError") {
// Transient error - SDK will retry
console.warn("Transient error:", err.message);
} else {
// Fatal error
console.error("Fatal error:", err);
}
},
});
createBatch() for efficient sendinglastEnqueuedSequenceNumber vs processed sequenceThis skill is applicable to execute the workflow or actions described in the overview.
documentation
Create beautiful visual art in .png and .pdf documents using design philosophy. You should use this skill when the user asks to create a poster, piece of art, design, or other static piece. Create ...
tools
Automate Canva tasks via Rube MCP (Composio): designs, exports, folders, brand templates, autofill. Always search tools first for current schemas.
tools
Automate Calendly scheduling, event management, invitee tracking, availability checks, and organization administration via Rube MCP (Composio). Always search tools first for current schemas.
tools
Automate Cal.com tasks via Rube MCP (Composio): manage bookings, check availability, configure webhooks, and handle teams. Always search tools first for current schemas.