skills/codex/azure-servicebus-py/SKILL.md
<!-- AUTO-GENERATED by export-skills.py — DO NOT EDIT --> --- name: azure-servicebus-py description: "Azure Service Bus SDK for Python" --- # Azure Service Bus SDK for Python Enterprise messaging for reliable cloud communication with queues and pub/sub topics. ## Installation ```bash pip install azure-servicebus azure-identity ``` ## Environment Variables ```bash SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net SERVICEBUS_QUEUE_NAME=myqueue SERVICEBUS_TOPIC_NAME=myto
npx skillsauth add frank-luongt/faos-skills-marketplace skills/codex/azure-servicebus-pyInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Enterprise messaging for reliable cloud communication with queues and pub/sub topics.
pip install azure-servicebus azure-identity
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
SERVICEBUS_QUEUE_NAME=myqueue
SERVICEBUS_TOPIC_NAME=mytopic
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
client = ServiceBusClient(
fully_qualified_namespace=namespace,
credential=credential
)
| Client | Purpose | Get From |
|--------|---------|----------|
| ServiceBusClient | Connection management | Direct instantiation |
| ServiceBusSender | Send messages | client.get_queue_sender() / get_topic_sender() |
| ServiceBusReceiver | Receive messages | client.get_queue_receiver() / get_subscription_receiver() |
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
# Single message
message = ServiceBusMessage("Hello, Service Bus!")
await sender.send_messages(message)
# Batch of messages
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
# Message batch (for size control)
batch = await sender.create_message_batch()
for i in range(100):
try:
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
except ValueError: # Batch full
await sender.send_messages(batch)
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
await sender.send_messages(batch)
asyncio.run(send_messages())
async def receive_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
# Receive batch
messages = await receiver.receive_messages(
max_message_count=10,
max_wait_time=5 # seconds
)
for msg in messages:
print(f"Received: {str(msg)}")
await receiver.complete_message(msg) # Remove from queue
asyncio.run(receive_messages())
| Mode | Behavior | Use Case |
|------|----------|----------|
| PEEK_LOCK (default) | Message locked, must complete/abandon | Reliable processing |
| RECEIVE_AND_DELETE | Removed immediately on receive | At-most-once delivery |
from azure.servicebus import ServiceBusReceiveMode
receiver = client.get_queue_receiver(
queue_name="myqueue",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)
async with receiver:
messages = await receiver.receive_messages(max_message_count=1)
for msg in messages:
try:
# Process message...
await receiver.complete_message(msg) # Success - remove from queue
except ProcessingError:
await receiver.abandon_message(msg) # Retry later
except PermanentError:
await receiver.dead_letter_message(
msg,
reason="ProcessingFailed",
error_description="Could not process"
)
| Action | Effect |
|--------|--------|
| complete_message() | Remove from queue (success) |
| abandon_message() | Release lock, retry immediately |
| dead_letter_message() | Move to dead-letter queue |
| defer_message() | Set aside, receive by sequence number |
# Send to topic
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
await sender.send_messages(ServiceBusMessage("Topic message"))
# Receive from subscription
receiver = client.get_subscription_receiver(
topic_name="mytopic",
subscription_name="mysubscription"
)
async with receiver:
messages = await receiver.receive_messages(max_message_count=10)
# Send with session
message = ServiceBusMessage("Session message")
message.session_id = "order-123"
await sender.send_messages(message)
# Receive from specific session
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id="order-123"
)
# Receive from next available session
from azure.servicebus import NEXT_AVAILABLE_SESSION
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id=NEXT_AVAILABLE_SESSION
)
from datetime import datetime, timedelta, timezone
message = ServiceBusMessage("Scheduled message")
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)
# Schedule message
sequence_number = await sender.schedule_messages(message, scheduled_time)
# Cancel scheduled message
await sender.cancel_scheduled_messages(sequence_number)
from azure.servicebus import ServiceBusSubQueue
# Receive from dead-letter queue
dlq_receiver = client.get_queue_receiver(
queue_name="myqueue",
sub_queue=ServiceBusSubQueue.DEAD_LETTER
)
async with dlq_receiver:
messages = await dlq_receiver.receive_messages(max_message_count=10)
for msg in messages:
print(f"Dead-lettered: {msg.dead_letter_reason}")
await dlq_receiver.complete_message(msg)
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=DefaultAzureCredential()
) as client:
with client.get_queue_sender("myqueue") as sender:
sender.send_messages(ServiceBusMessage("Sync message"))
with client.get_queue_receiver("myqueue") as receiver:
for msg in receiver:
print(str(msg))
receiver.complete_message(msg)
async with) for proper cleanupmax_wait_time to avoid infinite blocking| File | Contents | |------|----------| | references/patterns.md | Competing consumers, sessions, retry patterns, request-response, transactions | | references/dead-letter.md | DLQ handling, poison messages, reprocessing strategies | | scripts/setup_servicebus.py | CLI for queue/topic/subscription management and DLQ monitoring |
<!-- Source: .faos/custom/skills/cloud/azure/messaging/azure-servicebus-py/SKILL.md -->development
<!-- AUTO-GENERATED by export-skills.py — DO NOT EDIT --> --- name: databricks-mlflow-evaluation --- # MLflow 3 GenAI Evaluation ## Before Writing Any Code 1. **Read GOTCHAS.md** - 15+ common mistakes that cause failures 2. **Read CRITICAL-interfaces.md** - Exact API signatures and data schemas ## End-to-End Workflows Follow these workflows based on your goal. Each step indicates which reference files to read. ### Workflow 1: First-Time Evaluation Setup For users new to MLflow GenAI evalu
development
<!-- AUTO-GENERATED by export-skills.py — DO NOT EDIT --> --- name: databricks-lakebase-provisioned --- # Lakebase Provisioned Patterns and best practices for using Lakebase Provisioned (Databricks managed PostgreSQL) for OLTP workloads. ## When to Use Use this skill when: - Building applications that need a PostgreSQL database for transactional workloads - Adding persistent state to Databricks Apps - Implementing reverse ETL from Delta Lake to an operational database - Storing chat/agent m
tools
<!-- AUTO-GENERATED by export-skills.py — DO NOT EDIT --> --- name: databricks-jobs --- # Databricks Lakeflow Jobs ## Overview Databricks Jobs orchestrate data workflows with multi-task DAGs, flexible triggers, and comprehensive monitoring. Jobs support diverse task types and can be managed via Python SDK, CLI, or Asset Bundles. ## Reference Files | Use Case | Reference File | | ----------------------
development
<!-- AUTO-GENERATED by export-skills.py — DO NOT EDIT --> --- name: databricks-genie --- # Databricks Genie Create and query Databricks Genie Spaces - natural language interfaces for SQL-based data exploration. ## Overview Genie Spaces allow users to ask natural language questions about structured data in Unity Catalog. The system translates questions into SQL queries, executes them on a SQL warehouse, and presents results conversationally. ## When to Use This Skill Use this skill when: -