.build/direct/example/realtime-websocket-patterns/SKILL.md
Implement real-time features with WebSockets, Server-Sent Events, and long polling. Covers connection management, room-based messaging, presence tracking, and scaling strategies. Triggers on WebSocket implementation, real-time communication, or live update feature requests.
npx skillsauth add organvm-iv-taxis/a-i--skills realtime-websocket-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Build reliable real-time features with WebSockets, SSE, and proper connection management.
| Technology | Direction | Use Case | Complexity | |-----------|-----------|----------|------------| | WebSocket | Bidirectional | Chat, collaboration, gaming | High | | SSE | Server → Client | Notifications, dashboards, feeds | Low | | Long Polling | Request/Response | Fallback, simple updates | Low | | WebTransport | Bidirectional | Low-latency, unreliable OK | Very High |
Need bidirectional? ──yes──→ WebSocket
│no
▼
Need low latency? ──yes──→ SSE
│no
▼
Simple updates? ──yes──→ Long Polling
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from dataclasses import dataclass, field
app = FastAPI()
@dataclass
class ConnectionManager:
connections: dict[str, set[WebSocket]] = field(default_factory=dict)
async def connect(self, websocket: WebSocket, room: str):
await websocket.accept()
self.connections.setdefault(room, set()).add(websocket)
async def disconnect(self, websocket: WebSocket, room: str):
self.connections.get(room, set()).discard(websocket)
async def broadcast(self, room: str, message: dict):
for ws in list(self.connections.get(room, set())):
try:
await ws.send_json(message)
except Exception:
self.connections[room].discard(ws)
manager = ConnectionManager()
@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await manager.connect(websocket, room)
try:
while True:
data = await websocket.receive_json()
await manager.broadcast(room, {
"type": "message",
"room": room,
"data": data,
})
except WebSocketDisconnect:
await manager.disconnect(websocket, room)
await manager.broadcast(room, {
"type": "user_left",
"room": room,
})
from fastapi import Query, status
@app.websocket("/ws/{room}")
async def websocket_endpoint(
websocket: WebSocket,
room: str,
token: str = Query(...), # allow-secret
):
user = await verify_token(token)
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, room, user_id=user.id)
# ...
from enum import Enum
from pydantic import BaseModel
class MessageType(str, Enum):
TEXT = "text"
PRESENCE = "presence"
TYPING = "typing"
SYSTEM = "system"
ERROR = "error"
ACK = "ack"
class WsMessage(BaseModel):
type: MessageType
id: str | None = None # For acknowledgment
room: str | None = None
data: dict = {}
timestamp: float
# Client sends
{"type": "text", "id": "msg_123", "room": "general", "data": {"content": "Hello"}}
# Server acknowledges
{"type": "ack", "id": "msg_123", "data": {"status": "delivered"}}
# Server broadcasts
{"type": "text", "room": "general", "data": {"content": "Hello", "author": "user_42"}, "timestamp": 1711000000}
import asyncio
from collections import defaultdict
class PresenceTracker:
def __init__(self, timeout: float = 30.0):
self.timeout = timeout
self.presence: dict[str, dict[str, float]] = defaultdict(dict) # room → {user_id: last_seen}
async def heartbeat(self, room: str, user_id: str):
self.presence[room][user_id] = asyncio.get_event_loop().time()
async def get_online(self, room: str) -> list[str]:
now = asyncio.get_event_loop().time()
return [
uid for uid, last_seen in self.presence.get(room, {}).items()
if now - last_seen < self.timeout
]
async def cleanup_loop(self):
while True:
now = asyncio.get_event_loop().time()
for room in list(self.presence.keys()):
expired = [
uid for uid, ts in self.presence[room].items()
if now - ts > self.timeout
]
for uid in expired:
del self.presence[room][uid]
await asyncio.sleep(self.timeout / 2)
from sse_starlette.sse import EventSourceResponse
@app.get("/events/{room}")
async def event_stream(room: str):
async def generate():
queue = asyncio.Queue()
event_bus.subscribe(room, queue)
try:
while True:
event = await queue.get()
yield {
"event": event["type"],
"data": json.dumps(event["data"]),
"id": event.get("id"),
}
finally:
event_bus.unsubscribe(room, queue)
return EventSourceResponse(generate())
const events = new EventSource('/events/general');
events.addEventListener('message', (e) => {
const data = JSON.parse(e.data);
handleMessage(data);
});
events.addEventListener('presence', (e) => {
const data = JSON.parse(e.data);
updateOnlineUsers(data);
});
events.onerror = () => {
// Auto-reconnects with Last-Event-ID header
console.log('Connection lost, reconnecting...');
};
import redis.asyncio as redis
class RedisPubSubBridge:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.local_manager = ConnectionManager()
async def publish(self, room: str, message: dict):
await self.redis.publish(f"ws:{room}", json.dumps(message))
async def subscribe_loop(self, room: str):
pubsub = self.redis.pubsub()
await pubsub.subscribe(f"ws:{room}")
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
await self.local_manager.broadcast(room, data)
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.maxRetries = options.maxRetries || 10;
this.baseDelay = options.baseDelay || 1000;
this.retries = 0;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => { this.retries = 0; };
this.ws.onclose = () => { this.reconnect(); };
this.ws.onmessage = (e) => { this.onmessage?.(e); };
}
reconnect() {
if (this.retries >= this.maxRetries) return;
const delay = Math.min(this.baseDelay * Math.pow(2, this.retries), 30000);
setTimeout(() => { this.retries++; this.connect(); }, delay);
}
}
class OrderedMessageBuffer:
def __init__(self):
self.last_seq = 0
self.buffer: dict[int, dict] = {}
def process(self, message: dict) -> list[dict]:
seq = message.get("seq", 0)
self.buffer[seq] = message
ordered = []
while self.last_seq + 1 in self.buffer:
self.last_seq += 1
ordered.append(self.buffer.pop(self.last_seq))
return ordered
development
Optimize resumes and CVs for impact, ATS compatibility, and audience targeting. Supports multiple formats (chronological, functional, hybrid), accomplishment framing (STAR/XYZ), and tailoring for specific roles. Triggers on resume review, CV update, job application prep, or career document requests.
testing
Transfer context between AI agent sessions with structured handoff protocols, state serialization, and decision log preservation. Covers multi-agent coordination, context compression, and continuity patterns. Triggers on agent handoff, session transfer, or multi-agent continuity requests.
tools
Craft compelling fiction and creative nonfiction with attention to structure, voice, prose style, and revision. Supports short stories, novel chapters, essays, and hybrid forms. Triggers on creative writing, fiction writing, story craft, prose style, or literary technique requests.
devops
Transform AI conversations and chat transcripts into publishable content including blog posts, documentation, tutorials, and knowledge base entries. Covers extraction, restructuring, and editorial refinement. Triggers on conversation-to-content, transcript processing, or chat-to-doc requests.