distributions/codex/skills/realtime-websocket-patterns/SKILL.md
Implement real-time features with WebSockets, Server-Sent Events, and long polling. Covers connection management, room-based messaging, presence tracking, and scaling strategies. Triggers on WebSocket implementation, real-time communication, or live update feature requests.
npx skillsauth add a-organvm/a-i--skills realtime-websocket-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Build reliable real-time features with WebSockets, SSE, and proper connection management.
| Technology | Direction | Use Case | Complexity | |-----------|-----------|----------|------------| | WebSocket | Bidirectional | Chat, collaboration, gaming | High | | SSE | Server → Client | Notifications, dashboards, feeds | Low | | Long Polling | Request/Response | Fallback, simple updates | Low | | WebTransport | Bidirectional | Low-latency, unreliable OK | Very High |
Need bidirectional? ──yes──→ WebSocket
│no
▼
Need low latency? ──yes──→ SSE
│no
▼
Simple updates? ──yes──→ Long Polling
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from dataclasses import dataclass, field
app = FastAPI()
@dataclass
class ConnectionManager:
connections: dict[str, set[WebSocket]] = field(default_factory=dict)
async def connect(self, websocket: WebSocket, room: str):
await websocket.accept()
self.connections.setdefault(room, set()).add(websocket)
async def disconnect(self, websocket: WebSocket, room: str):
self.connections.get(room, set()).discard(websocket)
async def broadcast(self, room: str, message: dict):
for ws in list(self.connections.get(room, set())):
try:
await ws.send_json(message)
except Exception:
self.connections[room].discard(ws)
manager = ConnectionManager()
@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await manager.connect(websocket, room)
try:
while True:
data = await websocket.receive_json()
await manager.broadcast(room, {
"type": "message",
"room": room,
"data": data,
})
except WebSocketDisconnect:
await manager.disconnect(websocket, room)
await manager.broadcast(room, {
"type": "user_left",
"room": room,
})
from fastapi import Query, status
@app.websocket("/ws/{room}")
async def websocket_endpoint(
websocket: WebSocket,
room: str,
token: str = Query(...), # allow-secret
):
user = await verify_token(token)
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, room, user_id=user.id)
# ...
from enum import Enum
from pydantic import BaseModel
class MessageType(str, Enum):
TEXT = "text"
PRESENCE = "presence"
TYPING = "typing"
SYSTEM = "system"
ERROR = "error"
ACK = "ack"
class WsMessage(BaseModel):
type: MessageType
id: str | None = None # For acknowledgment
room: str | None = None
data: dict = {}
timestamp: float
# Client sends
{"type": "text", "id": "msg_123", "room": "general", "data": {"content": "Hello"}}
# Server acknowledges
{"type": "ack", "id": "msg_123", "data": {"status": "delivered"}}
# Server broadcasts
{"type": "text", "room": "general", "data": {"content": "Hello", "author": "user_42"}, "timestamp": 1711000000}
import asyncio
from collections import defaultdict
class PresenceTracker:
def __init__(self, timeout: float = 30.0):
self.timeout = timeout
self.presence: dict[str, dict[str, float]] = defaultdict(dict) # room → {user_id: last_seen}
async def heartbeat(self, room: str, user_id: str):
self.presence[room][user_id] = asyncio.get_event_loop().time()
async def get_online(self, room: str) -> list[str]:
now = asyncio.get_event_loop().time()
return [
uid for uid, last_seen in self.presence.get(room, {}).items()
if now - last_seen < self.timeout
]
async def cleanup_loop(self):
while True:
now = asyncio.get_event_loop().time()
for room in list(self.presence.keys()):
expired = [
uid for uid, ts in self.presence[room].items()
if now - ts > self.timeout
]
for uid in expired:
del self.presence[room][uid]
await asyncio.sleep(self.timeout / 2)
from sse_starlette.sse import EventSourceResponse
@app.get("/events/{room}")
async def event_stream(room: str):
async def generate():
queue = asyncio.Queue()
event_bus.subscribe(room, queue)
try:
while True:
event = await queue.get()
yield {
"event": event["type"],
"data": json.dumps(event["data"]),
"id": event.get("id"),
}
finally:
event_bus.unsubscribe(room, queue)
return EventSourceResponse(generate())
const events = new EventSource('/events/general');
events.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
handleMessage(data);
});
events.addEventListener('presence', (e) => {
const data = JSON.parse(e.data);
updateOnlineUsers(data);
});
events.onerror = () => {
// Auto-reconnects with Last-Event-ID header
console.log('Connection lost, reconnecting...');
};
import redis.asyncio as redis
class RedisPubSubBridge:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.local_manager = ConnectionManager()
async def publish(self, room: str, message: dict):
await self.redis.publish(f"ws:{room}", json.dumps(message))
async def subscribe_loop(self, room: str):
pubsub = self.redis.pubsub()
await pubsub.subscribe(f"ws:{room}")
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await self.local_manager.broadcast(room, data)
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.maxRetries = options.maxRetries || 10;
this.baseDelay = options.baseDelay || 1000;
this.retries = 0;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => { this.retries = 0; };
this.ws.onclose = () => { this.reconnect(); };
this.ws.onmessage = (e) => { this.onmessage?.(e); };
}
reconnect() {
if (this.retries >= this.maxRetries) return;
const delay = Math.min(this.baseDelay * Math.pow(2, this.retries), 30000);
setTimeout(() => { this.retries++; this.connect(); }, delay);
}
}
class OrderedMessageBuffer:
def __init__(self):
self.last_seq = 0
self.buffer: dict[int, dict] = {}
def process(self, message: dict) -> list[dict]:
seq = message.get("seq", 0)
self.buffer[seq] = message
ordered = []
while self.last_seq + 1 in self.buffer:
self.last_seq += 1
ordered.append(self.buffer.pop(self.last_seq))
return ordered
testing
Designs systems for encoding, scoring, and generating choreographic movement using Laban notation, computational geometry, and procedural animation principles.
tools
Manage monorepos and multi-package repositories with workspace tools, dependency management, selective builds, and change detection. Covers npm/pnpm workspaces, Turborepo, and Python monorepo patterns. Triggers on monorepo setup, workspace management, or multi-package repository requests.
development
Curated bundle for managing monorepos with containerized deployment pipelines. Includes monorepo management, Docker containerization, CI/CD deployment, and coding standards. Use when setting up or improving multi-package repository infrastructure.
development
Apply modular synthesis principles to system design, workflow architecture, and conceptual frameworks. Use when designing modular systems, creating architecture diagrams using synthesis metaphors, applying signal flow thinking to data pipelines, or translating between audio engineering and software concepts. Triggers on modular architecture design, signal flow diagrams, synthesis-inspired system thinking, or "oscillator/patch" metaphors.