distributions/codex/skills/resilience-patterns/SKILL.md
Build fault-tolerant systems with circuit breakers, retries with backoff, bulkheads, timeouts, and graceful degradation. Covers distributed system failure modes and recovery strategies. Triggers on reliability engineering, fault tolerance, or distributed system resilience requests.
npx skillsauth add organvm-iv-taxis/a-i--skills resilience-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Build systems that survive partial failures and degrade gracefully.
import asyncio
import random
from functools import wraps
def retry(max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 30.0):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
await asyncio.sleep(delay)
return wrapper
return decorator
@retry(max_attempts=3, base_delay=1.0)
async def fetch_data(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
return response.json()
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0.0
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError(f"Circuit open, retry after {self.recovery_timeout}s")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
async def with_timeout(coro, seconds: float):
try:
return await asyncio.wait_for(coro, timeout=seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"Operation timed out after {seconds}s")
class Bulkhead:
"""Limit concurrent access to a resource."""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute(self, func, *args, **kwargs):
async with self.semaphore:
return await func(*args, **kwargs)
# Isolate different downstream services
payment_bulkhead = Bulkhead(max_concurrent=5)
inventory_bulkhead = Bulkhead(max_concurrent=20)
async def get_user_profile(user_id: str) -> dict:
try:
return await primary_service.get_profile(user_id)
except ServiceUnavailable:
try:
return await cache.get_profile(user_id) # Stale cache fallback
except CacheMiss:
return {"user_id": user_id, "name": "Unknown", "_fallback": True}
Chain patterns for defense in depth:
Request → Timeout → Bulkhead → Circuit Breaker → Retry → Service Call
class ResilientClient:
def __init__(self):
self.circuit = CircuitBreaker(failure_threshold=5)
self.bulkhead = Bulkhead(max_concurrent=10)
@retry(max_attempts=3, base_delay=0.5)
async def call(self, url: str) -> dict:
return await with_timeout(
self.bulkhead.execute(
self.circuit.call, self._do_request, url
),
seconds=15
)
async def _do_request(self, url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5)
response.raise_for_status()
return response.json()
import asyncio
from collections import deque
class RateLimiter:
def __init__(self, rate: int, per: float = 1.0):
self.rate = rate
self.per = per
self.tokens = rate
self.last_refill = time.time()
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.rate, self.tokens + elapsed * (self.rate / self.per))
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
async def health_check() -> dict:
checks = {
"database": check_database(),
"cache": check_cache(),
"external_api": check_external_api(),
}
results = {}
for name, check in checks.items():
try:
await asyncio.wait_for(check, timeout=5)
results[name] = HealthStatus.HEALTHY
except Exception:
results[name] = HealthStatus.UNHEALTHY
overall = (
HealthStatus.HEALTHY if all(v == HealthStatus.HEALTHY for v in results.values())
else HealthStatus.DEGRADED if any(v == HealthStatus.HEALTHY for v in results.values())
else HealthStatus.UNHEALTHY
)
return {"status": overall.value, "checks": {k: v.value for k, v in results.items()}}
| Failure Mode | Pattern | Recovery | |-------------|---------|----------| | Transient network error | Retry with backoff | Automatic | | Service down | Circuit breaker | Automatic after recovery | | Service overloaded | Bulkhead + rate limit | Shed load | | Slow response | Timeout | Fail fast | | Cascade failure | Circuit breaker + bulkhead | Isolate blast radius | | Data corruption | Idempotent operations | Safe retry |
async def process_payment(idempotency_key: str, amount: float):
existing = await db.get_by_idempotency_key(idempotency_key)
if existing:
return existing # Already processed
result = await payment_gateway.charge(amount)
await db.store(idempotency_key=idempotency_key, result=result)
return result
development
Optimize resumes and CVs for impact, ATS compatibility, and audience targeting. Supports multiple formats (chronological, functional, hybrid), accomplishment framing (STAR/XYZ), and tailoring for specific roles. Triggers on resume review, CV update, job application prep, or career document requests.
testing
Transfer context between AI agent sessions with structured handoff protocols, state serialization, and decision log preservation. Covers multi-agent coordination, context compression, and continuity patterns. Triggers on agent handoff, session transfer, or multi-agent continuity requests.
tools
Craft compelling fiction and creative nonfiction with attention to structure, voice, prose style, and revision. Supports short stories, novel chapters, essays, and hybrid forms. Triggers on creative writing, fiction writing, story craft, prose style, or literary technique requests.
devops
Transform AI conversations and chat transcripts into publishable content including blog posts, documentation, tutorials, and knowledge base entries. Covers extraction, restructuring, and editorial refinement. Triggers on conversation-to-content, transcript processing, or chat-to-doc requests.