skills/azure-messaging-webpubsubservice-py/SKILL.md
--- name: azure-messaging-webpubsubservice-py description: "|" Azure Web PubSub Service SDK for Python. Use for real-time messaging, WebSocket connections, and pub/sub patterns. Triggers: "azure-messaging-webpubsubservice", "WebPubSubServiceClient", "real-time", "WebSocket", "pub/sub". package: azure-messaging-webpubsubservice risk: unknown source: community --- # Azure Web PubSub Service SDK for Python Real-time messaging with WebSocket connections at scale. ## Installation ```bash # Se
npx skillsauth add luismarinoc/antigravity-awesome-skills skills/azure-messaging-webpubsubservice-pyInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Real-time messaging with WebSocket connections at scale.
# Service SDK (server-side)
pip install azure-messaging-webpubsubservice
# Client SDK (for Python WebSocket clients)
pip install azure-messaging-webpubsubclient
AZURE_WEBPUBSUB_CONNECTION_STRING=Endpoint=https://<name>.webpubsub.azure.com;AccessKey=...
AZURE_WEBPUBSUB_HUB=my-hub
from azure.messaging.webpubsubservice import WebPubSubServiceClient
# Connection string
client = WebPubSubServiceClient.from_connection_string(
connection_string=os.environ["AZURE_WEBPUBSUB_CONNECTION_STRING"],
hub="my-hub"
)
# Entra ID
from azure.identity import DefaultAzureCredential
client = WebPubSubServiceClient(
endpoint="https://<name>.webpubsub.azure.com",
hub="my-hub",
credential=DefaultAzureCredential()
)
# Token for anonymous user
token = client.get_client_access_token()
print(f"URL: {token['url']}")
# Token with user ID
token = client.get_client_access_token(
user_id="user123",
roles=["webpubsub.sendToGroup", "webpubsub.joinLeaveGroup"]
)
# Token with groups
token = client.get_client_access_token(
user_id="user123",
groups=["group1", "group2"]
)
# Send text
client.send_to_all(message="Hello everyone!", content_type="text/plain")
# Send JSON
client.send_to_all(
message={"type": "notification", "data": "Hello"},
content_type="application/json"
)
client.send_to_user(
user_id="user123",
message="Hello user!",
content_type="text/plain"
)
client.send_to_group(
group="my-group",
message="Hello group!",
content_type="text/plain"
)
client.send_to_connection(
connection_id="abc123",
message="Hello connection!",
content_type="text/plain"
)
# Add user to group
client.add_user_to_group(group="my-group", user_id="user123")
# Remove user from group
client.remove_user_from_group(group="my-group", user_id="user123")
# Add connection to group
client.add_connection_to_group(group="my-group", connection_id="abc123")
# Remove connection from group
client.remove_connection_from_group(group="my-group", connection_id="abc123")
# Check if connection exists
exists = client.connection_exists(connection_id="abc123")
# Check if user has connections
exists = client.user_exists(user_id="user123")
# Check if group has connections
exists = client.group_exists(group="my-group")
# Close connection
client.close_connection(connection_id="abc123", reason="Session ended")
# Close all connections for user
client.close_all_connections(user_id="user123")
from azure.messaging.webpubsubservice import WebPubSubServiceClient
# Grant permission
client.grant_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
# Revoke permission
client.revoke_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
# Check permission
has_permission = client.check_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
from azure.messaging.webpubsubclient import WebPubSubClient
client = WebPubSubClient(credential=token["url"])
# Event handlers
@client.on("connected")
def on_connected(e):
print(f"Connected: {e.connection_id}")
@client.on("server-message")
def on_message(e):
print(f"Message: {e.data}")
@client.on("group-message")
def on_group_message(e):
print(f"Group {e.group}: {e.data}")
# Connect and send
client.open()
client.send_to_group("my-group", "Hello from Python!")
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
from azure.identity.aio import DefaultAzureCredential
async def broadcast():
credential = DefaultAzureCredential()
client = WebPubSubServiceClient(
endpoint="https://<name>.webpubsub.azure.com",
hub="my-hub",
credential=credential
)
await client.send_to_all("Hello async!", content_type="text/plain")
await client.close()
await credential.close()
| Operation | Description |
|-----------|-------------|
| get_client_access_token | Generate WebSocket connection URL |
| send_to_all | Broadcast to all connections |
| send_to_user | Send to specific user |
| send_to_group | Send to group members |
| send_to_connection | Send to specific connection |
| add_user_to_group | Add user to group |
| remove_user_from_group | Remove user from group |
| close_connection | Disconnect client |
| connection_exists | Check connection status |
This skill is applicable to execute the workflow or actions described in the overview.
testing
This skill should be used when the user asks to "perform cloud penetration testing", "assess Azure or AWS or GCP security", "enumerate cloud resources", "exploit cloud misconfiguratio...
testing
--- name: cloud-architect description: "Expert cloud architect specializing in AWS/Azure/GCP multi-cloud" infrastructure design, advanced IaC (Terraform/OpenTofu/CDK), FinOps cost optimization, and modern architectural patterns. Masters serverless, microservices, security, compliance, and disaster recovery. Use PROACTIVELY for cloud architecture, cost optimization, migration planning, or multi-cloud strategies. metadata: model: opus risk: unknown source: community --- ## Use this sk
tools
Automate Close CRM tasks via Rube MCP (Composio): create leads, manage calls/SMS, handle tasks, and track notes. Always search tools first for current schemas.
tools
Automate ClickUp project management including tasks, spaces, folders, lists, comments, and team operations via Rube MCP (Composio). Always search tools first for current schemas.