skills/cjharmath/py-observability/SKILL.md
Observability patterns for Python backends. Use when adding logging, metrics, tracing, or debugging production issues.
npx skillsauth add aiskillstore/marketplace py-observabilityInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Production issues are impossible to debug without observability. Logging, metrics, and tracing must be built in from the start. Silent failures, missing context in errors, and lack of metrics make incidents last longer.
Problem: Unstructured logs are hard to search and analyze.
# ❌ BAD: Unstructured logging
import logging
logger = logging.getLogger(__name__)
logger.info(f"User {user_id} started assessment {assessment_id}")
logger.error(f"Failed to save answer: {error}")
# ✅ GOOD: Structured logging with structlog
import structlog
logger = structlog.get_logger()
logger.info(
"assessment_started",
user_id=str(user_id),
assessment_id=str(assessment_id),
)
logger.error(
"answer_save_failed",
user_id=str(user_id),
question_id=str(question_id),
error=str(error),
error_type=type(error).__name__,
)
# app/core/logging.py
import structlog
import logging
import sys
def setup_logging(json_logs: bool = True, log_level: str = "INFO"):
"""Configure structured logging."""
# Shared processors
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
]
if json_logs:
# JSON for production (machine-readable)
processors = shared_processors + [
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
]
else:
# Pretty for development (human-readable)
processors = shared_processors + [
structlog.dev.ConsoleRenderer(),
]
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
logging.getLevelName(log_level)
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Call at startup
setup_logging(json_logs=not settings.DEBUG)
Problem: Logs from same request aren't correlated.
import structlog
from contextvars import ContextVar
from uuid import uuid4
from fastapi import Request
request_id_var: ContextVar[str] = ContextVar("request_id", default="")
# Middleware to set request context
@app.middleware("http")
async def add_request_context(request: Request, call_next):
request_id = str(uuid4())[:8]
request_id_var.set(request_id)
# Bind to all logs in this request
structlog.contextvars.bind_contextvars(
request_id=request_id,
path=request.url.path,
method=request.method,
)
try:
response = await call_next(request)
return response
finally:
structlog.contextvars.unbind_contextvars(
"request_id", "path", "method"
)
# Now all logs automatically include request_id
logger.info("processing_assessment") # Includes request_id, path, method
logger = structlog.get_logger()
# DEBUG: Detailed diagnostic info (dev only)
logger.debug("query_executed", sql=str(query), params=params)
# INFO: Business events, successful operations
logger.info("assessment_submitted", user_id=user_id, score=score)
# WARNING: Unexpected but handled conditions
logger.warning(
"rate_limit_approaching",
user_id=user_id,
current=current_count,
limit=rate_limit,
)
# ERROR: Failures that need attention
logger.error(
"payment_failed",
user_id=user_id,
error=str(error),
payment_id=payment_id,
)
# CRITICAL: System-level failures
logger.critical(
"database_connection_failed",
error=str(error),
host=db_host,
)
Same principle as frontend - every early return should log:
# ❌ BAD: Silent early return
async def save_answer(user_id: UUID, question_id: UUID, value: int):
if not await is_valid_question(question_id):
return None # Why did we return? No one knows.
# ✅ GOOD: Observable early return
async def save_answer(user_id: UUID, question_id: UUID, value: int):
if not await is_valid_question(question_id):
logger.warning(
"save_answer_skipped",
reason="invalid_question",
user_id=str(user_id),
question_id=str(question_id),
)
return None
# ❌ BAD: Error without context
try:
result = await risky_operation()
except Exception as e:
logger.error(f"Operation failed: {e}")
raise
# ✅ GOOD: Error with full context
try:
result = await risky_operation(user_id, assessment_id)
except Exception as e:
logger.exception(
"risky_operation_failed",
user_id=str(user_id),
assessment_id=str(assessment_id),
error_type=type(e).__name__,
error_message=str(e),
)
raise
# app/core/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import Response
# Counters - things that only go up
http_requests_total = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "path", "status"],
)
assessment_submissions = Counter(
"assessment_submissions_total",
"Total assessment submissions",
["skill_area", "status"], # status: success, validation_error, etc.
)
# Histograms - distribution of values
request_duration = Histogram(
"http_request_duration_seconds",
"HTTP request duration",
["method", "path"],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0],
)
db_query_duration = Histogram(
"db_query_duration_seconds",
"Database query duration",
["query_type"], # select, insert, update
buckets=[0.001, 0.01, 0.1, 0.5, 1.0],
)
# Gauges - values that go up and down
active_connections = Gauge(
"db_active_connections",
"Active database connections",
)
# Endpoint to expose metrics
@app.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain",
)
import time
# Middleware for HTTP metrics
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
request_duration.labels(
method=request.method,
path=request.url.path,
).observe(duration)
http_requests_total.labels(
method=request.method,
path=request.url.path,
status=response.status_code,
).inc()
return response
# In business logic
async def submit_assessment(assessment_id: UUID, session: AsyncSession):
try:
result = await _process_submission(assessment_id, session)
assessment_submissions.labels(
skill_area=result.skill_area,
status="success",
).inc()
return result
except ValidationError:
assessment_submissions.labels(
skill_area="unknown",
status="validation_error",
).inc()
raise
# app/core/sentry.py
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
def setup_sentry(dsn: str, environment: str):
sentry_sdk.init(
dsn=dsn,
environment=environment,
traces_sample_rate=0.1, # 10% of requests traced
profiles_sample_rate=0.1,
integrations=[
FastApiIntegration(transaction_style="url"),
SqlalchemyIntegration(),
],
# Don't send PII
send_default_pii=False,
# Add context
before_send=before_send,
)
def before_send(event, hint):
# Scrub sensitive data
if "request" in event and "data" in event["request"]:
data = event["request"]["data"]
if isinstance(data, dict):
for key in ["password", "token", "api_key"]:
if key in data:
data[key] = "[REDACTED]"
return event
# Usage - errors auto-captured, or manually:
from sentry_sdk import capture_exception, capture_message, set_user
# Set user context
set_user({"id": str(user_id), "email": user.email})
# Capture with context
with sentry_sdk.push_scope() as scope:
scope.set_tag("assessment_id", str(assessment_id))
scope.set_context("assessment", {"skill_area": skill_area})
capture_exception(error)
Problem: Multi-step operations where it's unclear how far execution got.
logger = structlog.get_logger()
async def retake_assessment_flow(
user_id: UUID,
assessment_id: UUID,
skill_area: str,
session: AsyncSession,
):
flow_id = f"retake-{uuid4().hex[:8]}"
logger.info(
"retake_flow_started",
flow_id=flow_id,
user_id=str(user_id),
assessment_id=str(assessment_id),
skill_area=skill_area,
)
try:
# Step 1
logger.debug("retake_flow_step", flow_id=flow_id, step="load_completed")
completed = await load_completed_answers(assessment_id, session)
# Step 2
logger.debug("retake_flow_step", flow_id=flow_id, step="clear_answers")
await clear_skill_area_answers(user_id, skill_area, session)
# Step 3
logger.debug("retake_flow_step", flow_id=flow_id, step="enable_retake")
await enable_retake(user_id, assessment_id, skill_area, session)
logger.info(
"retake_flow_completed",
flow_id=flow_id,
user_id=str(user_id),
)
except Exception as e:
logger.error(
"retake_flow_failed",
flow_id=flow_id,
user_id=str(user_id),
error=str(e),
error_type=type(e).__name__,
)
raise
from fastapi import APIRouter
from datetime import datetime
router = APIRouter(tags=["Health"])
@router.get("/health")
async def health():
"""Basic liveness check."""
return {"status": "ok", "timestamp": datetime.utcnow().isoformat()}
@router.get("/health/ready")
async def readiness(session: AsyncSession = Depends(get_session)):
"""Readiness check - verify dependencies."""
checks = {}
# Database
try:
await session.execute(text("SELECT 1"))
checks["database"] = "ok"
except Exception as e:
checks["database"] = f"error: {e}"
# Redis (if used)
try:
await redis_client.ping()
checks["redis"] = "ok"
except Exception as e:
checks["redis"] = f"error: {e}"
all_ok = all(v == "ok" for v in checks.values())
return {
"status": "ok" if all_ok else "degraded",
"checks": checks,
"timestamp": datetime.utcnow().isoformat(),
}
SENSITIVE_KEYS = {"password", "token", "api_key", "secret", "authorization"}
def redact_sensitive(data: dict) -> dict:
"""Redact sensitive values from dict for logging."""
result = {}
for key, value in data.items():
if any(s in key.lower() for s in SENSITIVE_KEYS):
result[key] = "[REDACTED]"
elif isinstance(value, dict):
result[key] = redact_sensitive(value)
else:
result[key] = value
return result
# Use before logging request data
logger.info(
"request_received",
body=redact_sensitive(request_body),
)
When adding new features:
When debugging production:
development
Apple Human Interface Guidelines for content display components. Use this skill when the user asks about charts component, collection view, image view, web view, color well, image well, activity view, lockup, data visualization, content display, displaying images, rendering web content, color pickers, or presenting collections of items in Apple apps. Also use when the user says how should I display charts, what's the best way to show images, should I use a web view, how do I build a grid of items, what component shows media, or how do I present a share sheet. Cross-references: hig-foundations for color/typography/accessibility, hig-patterns for data visualization patterns, hig-components-layout for structural containers, hig-platforms for platform-specific component behavior.
tools
Automate HelpDesk tasks via Rube MCP (Composio): list tickets, manage views, use canned responses, and configure custom fields. Always search tools first for current schemas.
testing
Expert Haskell engineer specializing in advanced type systems, pure functional design, and high-reliability software. Use PROACTIVELY for type-level programming, concurrency, and architecture guidance.
tools
GraphQL gives clients exactly the data they need - no more, no less. One endpoint, typed schema, introspection. But the flexibility that makes it powerful also makes it dangerous. Without proper controls, clients can craft queries that bring down your server. This skill covers schema design, resolvers, DataLoader for N+1 prevention, federation for microservices, and client integration with Apollo/urql. Key insight: GraphQL is a contract. The schema is the API documentation. Design it carefully.