skills_antigravity/skills/azure-eventhub-java/SKILL.md
Build real-time streaming applications with Azure Event Hubs SDK for Java. Use when implementing event streaming, high-throughput data ingestion, or building event-driven architectures.
npx skillsauth add alexsander532/atlas azure-eventhub-javaInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Build real-time streaming applications using the Azure Event Hubs SDK for Java.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.19.0</version>
</dependency>
<!-- For checkpoint store (production) -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.20.0</version>
</dependency>
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
// With connection string
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.buildProducerClient();
// Full connection string with EntityPath
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString("<connection-string-with-entity-path>")
.buildProducerClient();
import com.azure.identity.DefaultAzureCredentialBuilder;
EventHubProducerClient producer = new EventHubClientBuilder()
.fullyQualifiedNamespace("<namespace>.servicebus.windows.net")
.eventHubName("<event-hub-name>")
.credential(new DefaultAzureCredentialBuilder().build())
.buildProducerClient();
import com.azure.messaging.eventhubs.EventHubConsumerClient;
EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildConsumerClient();
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
EventHubProducerAsyncClient asyncProducer = new EventHubClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.buildAsyncProducerClient();
EventHubConsumerAsyncClient asyncConsumer = new EventHubClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.consumerGroup("$Default")
.buildAsyncConsumerClient();
import com.azure.messaging.eventhubs.EventData;
EventData eventData = new EventData("Hello, Event Hubs!");
producer.send(Collections.singletonList(eventData));
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
// Create batch
EventDataBatch batch = producer.createBatch();
// Add events (returns false if batch is full)
for (int i = 0; i < 100; i++) {
EventData event = new EventData("Event " + i);
if (!batch.tryAdd(event)) {
// Batch is full, send and create new batch
producer.send(batch);
batch = producer.createBatch();
batch.tryAdd(event);
}
}
// Send remaining events
if (batch.getCount() > 0) {
producer.send(batch);
}
CreateBatchOptions options = new CreateBatchOptions()
.setPartitionId("0");
EventDataBatch batch = producer.createBatch(options);
batch.tryAdd(new EventData("Partition 0 event"));
producer.send(batch);
CreateBatchOptions options = new CreateBatchOptions()
.setPartitionKey("customer-123");
EventDataBatch batch = producer.createBatch(options);
batch.tryAdd(new EventData("Customer event"));
producer.send(batch);
EventData event = new EventData("Order created");
event.getProperties().put("orderId", "ORD-123");
event.getProperties().put("customerId", "CUST-456");
event.getProperties().put("priority", 1);
producer.send(Collections.singletonList(event));
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
// Receive from specific partition
Iterable<PartitionEvent> events = consumer.receiveFromPartition(
"0", // partitionId
10, // maxEvents
EventPosition.earliest(), // startingPosition
Duration.ofSeconds(30) // timeout
);
for (PartitionEvent partitionEvent : events) {
EventData event = partitionEvent.getData();
System.out.println("Body: " + event.getBodyAsString());
System.out.println("Sequence: " + event.getSequenceNumber());
System.out.println("Offset: " + event.getOffset());
}
import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
// Create checkpoint store
BlobContainerAsyncClient blobClient = new BlobContainerClientBuilder()
.connectionString("<storage-connection-string>")
.containerName("checkpoints")
.buildAsyncClient();
// Create processor
EventProcessorClient processor = new EventProcessorClientBuilder()
.connectionString("<eventhub-connection-string>", "<event-hub-name>")
.consumerGroup("$Default")
.checkpointStore(new BlobCheckpointStore(blobClient))
.processEvent(eventContext -> {
EventData event = eventContext.getEventData();
System.out.println("Processing: " + event.getBodyAsString());
// Checkpoint after processing
eventContext.updateCheckpoint();
})
.processError(errorContext -> {
System.err.println("Error: " + errorContext.getThrowable().getMessage());
System.err.println("Partition: " + errorContext.getPartitionContext().getPartitionId());
})
.buildEventProcessorClient();
// Start processing
processor.start();
// Keep running...
Thread.sleep(Duration.ofMinutes(5).toMillis());
// Stop gracefully
processor.stop();
EventProcessorClient processor = new EventProcessorClientBuilder()
.connectionString("<connection-string>", "<event-hub-name>")
.consumerGroup("$Default")
.checkpointStore(new BlobCheckpointStore(blobClient))
.processEventBatch(eventBatchContext -> {
List<EventData> events = eventBatchContext.getEvents();
System.out.printf("Received %d events%n", events.size());
for (EventData event : events) {
// Process each event
System.out.println(event.getBodyAsString());
}
// Checkpoint after batch
eventBatchContext.updateCheckpoint();
}, 50) // maxBatchSize
.processError(errorContext -> {
System.err.println("Error: " + errorContext.getThrowable());
})
.buildEventProcessorClient();
asyncConsumer.receiveFromPartition("0", EventPosition.latest())
.subscribe(
partitionEvent -> {
EventData event = partitionEvent.getData();
System.out.println("Received: " + event.getBodyAsString());
},
error -> System.err.println("Error: " + error),
() -> System.out.println("Complete")
);
// Get hub info
EventHubProperties hubProps = producer.getEventHubProperties();
System.out.println("Hub: " + hubProps.getName());
System.out.println("Partitions: " + hubProps.getPartitionIds());
// Get partition info
PartitionProperties partitionProps = producer.getPartitionProperties("0");
System.out.println("Begin sequence: " + partitionProps.getBeginningSequenceNumber());
System.out.println("Last sequence: " + partitionProps.getLastEnqueuedSequenceNumber());
System.out.println("Last offset: " + partitionProps.getLastEnqueuedOffset());
// Start from beginning
EventPosition.earliest()
// Start from end (new events only)
EventPosition.latest()
// From specific offset
EventPosition.fromOffset(12345L)
// From specific sequence number
EventPosition.fromSequenceNumber(100L)
// From specific time
EventPosition.fromEnqueuedTime(Instant.now().minus(Duration.ofHours(1)))
import com.azure.messaging.eventhubs.models.ErrorContext;
.processError(errorContext -> {
Throwable error = errorContext.getThrowable();
String partitionId = errorContext.getPartitionContext().getPartitionId();
if (error instanceof AmqpException) {
AmqpException amqpError = (AmqpException) error;
if (amqpError.isTransient()) {
System.out.println("Transient error, will retry");
}
}
System.err.printf("Error on partition %s: %s%n", partitionId, error.getMessage());
})
// Always close clients
try {
producer.send(batch);
} finally {
producer.close();
}
// Or use try-with-resources
try (EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient()) {
producer.send(events);
}
EVENT_HUBS_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...
EVENT_HUBS_NAME=<event-hub-name>
STORAGE_CONNECTION_STRING=<for-checkpointing>
EventDataBatch for efficient sendingtools
Multi-agent autonomous startup system for Claude Code. Triggers on "Loki Mode". Orchestrates 100+ specialized agents across engineering, QA, DevOps, security, data/ML, business operations, marketing, HR, and customer success. Takes PRD to fully deployed, revenue-generating product with zero human intervention. Features Task tool for subagent dispatch, parallel code review with 3 specialized reviewers, severity-based issue triage, distributed task queue with dead letter handling, automatic deployment to cloud providers, A/B testing, customer feedback loops, incident response, circuit breakers, and self-healing. Handles rate limits via distributed state checkpoints and auto-resume with exponential backoff. Requires --dangerously-skip-permissions flag.
development
Best practices for Remotion - Video creation in React
content-media
When the user wants to create, optimize, or analyze a referral program, affiliate program, or word-of-mouth strategy. Also use when the user mentions 'referral,' 'affiliate,' 'ambassador,' 'word of mouth,' 'viral loop,' 'refer a friend,' or 'partner program.' This skill covers program design, incentive structure, and growth optimization.
development
Creates exhaustive technical references and API documentation. Generates comprehensive parameter listings, configuration guides, and searchable reference materials. Use PROACTIVELY for API docs, configuration references, or complete technical specifications.