skills/azure-monitor-ingestion-py/SKILL.md
Azure Monitor Ingestion SDK for Python. Use for sending custom logs to Log Analytics workspace via Logs Ingestion API. Triggers: "azure-monitor-ingestion", "LogsIngestionClient", "custom logs", "DCR", "data collection rule", "Log Analytics".
npx skillsauth add athility/krashitos-ai-os-portfolio azure-monitor-ingestion-pyInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Send custom logs to Azure Monitor Log Analytics workspace using the Logs Ingestion API.
pip install azure-monitor-ingestion
pip install azure-identity
# Data Collection Endpoint (DCE)
AZURE_DCE_ENDPOINT=https://<dce-name>.<region>.ingest.monitor.azure.com
# Data Collection Rule (DCR) immutable ID
AZURE_DCR_RULE_ID=dcr-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
# Stream name from DCR
AZURE_DCR_STREAM_NAME=Custom-MyTable_CL
Before using this SDK, you need:
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import os
client = LogsIngestionClient(
endpoint=os.environ["AZURE_DCE_ENDPOINT"],
credential=DefaultAzureCredential()
)
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import os
client = LogsIngestionClient(
endpoint=os.environ["AZURE_DCE_ENDPOINT"],
credential=DefaultAzureCredential()
)
rule_id = os.environ["AZURE_DCR_RULE_ID"]
stream_name = os.environ["AZURE_DCR_STREAM_NAME"]
logs = [
{"TimeGenerated": "2024-01-15T10:00:00Z", "Computer": "server1", "Message": "Application started"},
{"TimeGenerated": "2024-01-15T10:01:00Z", "Computer": "server1", "Message": "Processing request"},
{"TimeGenerated": "2024-01-15T10:02:00Z", "Computer": "server2", "Message": "Connection established"}
]
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs)
import json
with open("logs.json", "r") as f:
logs = json.load(f)
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs)
Handle partial failures with a callback:
failed_logs = []
def on_error(error):
print(f"Upload failed: {error.error}")
failed_logs.extend(error.failed_logs)
client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs,
on_error=on_error
)
# Retry failed logs
if failed_logs:
print(f"Retrying {len(failed_logs)} failed logs...")
client.upload(rule_id=rule_id, stream_name=stream_name, logs=failed_logs)
def ignore_errors(error):
pass # Silently ignore upload failures
client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs,
on_error=ignore_errors
)
import asyncio
from azure.monitor.ingestion.aio import LogsIngestionClient
from azure.identity.aio import DefaultAzureCredential
async def upload_logs():
async with LogsIngestionClient(
endpoint=endpoint,
credential=DefaultAzureCredential()
) as client:
await client.upload(
rule_id=rule_id,
stream_name=stream_name,
logs=logs
)
asyncio.run(upload_logs())
from azure.identity import AzureAuthorityHosts, DefaultAzureCredential
from azure.monitor.ingestion import LogsIngestionClient
# Azure Government
credential = DefaultAzureCredential(authority=AzureAuthorityHosts.AZURE_GOVERNMENT)
client = LogsIngestionClient(
endpoint="https://example.ingest.monitor.azure.us",
credential=credential,
credential_scopes=["https://monitor.azure.us/.default"]
)
The SDK automatically:
No manual batching needed for large log sets.
| Client | Purpose |
|--------|---------|
| LogsIngestionClient | Sync client for uploading logs |
| LogsIngestionClient (aio) | Async client for uploading logs |
| Concept | Description |
|---------|-------------|
| DCE | Data Collection Endpoint — ingestion URL |
| DCR | Data Collection Rule — defines schema, transformations, destination |
| Stream | Named data flow within a DCR |
| Custom Table | Target table in Log Analytics (ends with _CL) |
Stream names follow patterns:
Custom-<TableName>_CL — For custom tablesMicrosoft-<TableName> — For built-in tableson_error callback for partial failuresdevelopment
Create Zustand stores with TypeScript, subscribeWithSelector middleware, and proper state/action separation. Use when building React state management, creating global stores, or implementing reactive state patterns with Zustand.
tools
Automate Zoom meeting creation, management, recordings, webinars, and participant tracking via Rube MCP (Composio). Always search tools first for current schemas.
tools
Automate Zoho CRM tasks via Rube MCP (Composio): create/update records, search contacts, manage leads, and convert leads. Always search tools first for current schemas.
tools
Automate Zendesk tasks via Rube MCP (Composio): tickets, users, organizations, replies. Always search tools first for current schemas.