.claude/skills/ts-audit-logging/SKILL.md
Implement tamper-evident audit logs for compliance (SOC 2, HIPAA, PCI DSS). Use when building compliance audit trails, tracking who did what and when, or implementing immutable event logs that satisfy regulatory retention requirements.
npx skillsauth add eliferjunior/Claude audit-loggingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Compliance audit logs must answer: who did what to which resource, when, from where, and with what result. They must also be tamper-evident — an auditor must be able to verify logs haven't been modified.
Regulatory retention requirements: | Standard | Retention | |----------|-----------| | HIPAA | 6 years | | PCI DSS | 12 months online + 12 months archive | | SOC 2 (typical) | 1 year | | GDPR | Defined by purpose (often 1-3 years) |
interface AuditLogEntry {
// Core required fields
id: string; // UUID — unique event identifier
timestamp: string; // ISO 8601 UTC — when the event occurred
actor_id: string; // User/service that performed the action
actor_type: 'user' | 'service' | 'admin' | 'system';
action: string; // What happened: "read" | "write" | "delete" | "login" | "export"
resource_type: string; // "patient_record" | "invoice" | "user_account"
resource_id: string; // ID of the affected resource
result: 'success' | 'failure' | 'denied';
// Context fields
ip_address: string; // Source IP
user_agent?: string; // Browser/client identifier
session_id?: string; // Session identifier
// Optional fields
changed_fields?: string[]; // For write operations: which fields changed
old_values?: Record<string, unknown>; // Previous values (be careful with PII)
reason?: string; // Clinical justification (HIPAA), business reason
// Tamper-evidence
prev_hash?: string; // Hash of previous log entry (hash chaining)
hash: string; // SHA-256 of this entry
}
Hash chaining makes log tampering detectable: each entry includes a hash of the previous entry. Modifying any entry breaks the chain.
import json
import hashlib
import uuid
from datetime import datetime, timezone
from typing import Optional
class AuditLogger:
def __init__(self, db_connection):
self.db = db_connection
self._last_hash: Optional[str] = None
def _compute_hash(self, entry: dict, prev_hash: Optional[str]) -> str:
"""Compute SHA-256 of the log entry for tamper-evidence."""
hashable = {
"id": entry["id"],
"timestamp": entry["timestamp"],
"actor_id": entry["actor_id"],
"action": entry["action"],
"resource_type": entry["resource_type"],
"resource_id": entry["resource_id"],
"result": entry["result"],
"prev_hash": prev_hash or "GENESIS"
}
content = json.dumps(hashable, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
async def log(
self,
actor_id: str,
actor_type: str,
action: str,
resource_type: str,
resource_id: str,
result: str,
ip_address: str,
**kwargs
) -> dict:
"""Create and persist a tamper-evident audit log entry."""
# Get last hash for chaining
prev_hash = self._last_hash or await self._get_last_hash()
entry = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now(timezone.utc).isoformat(),
"actor_id": actor_id,
"actor_type": actor_type,
"action": action,
"resource_type": resource_type,
"resource_id": resource_id,
"result": result,
"ip_address": ip_address,
"prev_hash": prev_hash or "GENESIS",
**kwargs
}
entry["hash"] = self._compute_hash(entry, prev_hash)
self._last_hash = entry["hash"]
# Persist to append-only storage
await self.db.audit_logs.insert_one(entry)
return entry
async def verify_chain(self, limit: int = 1000) -> bool:
"""Verify the hash chain integrity."""
entries = await self.db.audit_logs.find(
sort=[("timestamp", 1)],
limit=limit
)
prev_hash = "GENESIS"
for i, entry in enumerate(entries):
expected_hash = self._compute_hash(entry, prev_hash)
if entry["hash"] != expected_hash:
print(f"❌ Chain broken at entry {i}: id={entry['id']}")
return False
if entry["prev_hash"] != prev_hash:
print(f"❌ prev_hash mismatch at entry {i}")
return False
prev_hash = entry["hash"]
print(f"✅ Chain verified: {len(entries)} entries intact")
return True
async def _get_last_hash(self) -> Optional[str]:
last = await self.db.audit_logs.find_one(sort=[("timestamp", -1)])
return last["hash"] if last else None
const { v4: uuidv4 } = require('uuid');
const crypto = require('crypto');
class AuditLogger {
constructor(db) {
this.db = db;
}
async log({ actorId, actorType = 'user', action, resourceType, resourceId,
result, ipAddress, sessionId, reason, changedFields }) {
const prevHash = await this.getLastHash();
const entry = {
id: uuidv4(),
timestamp: new Date().toISOString(),
actor_id: actorId,
actor_type: actorType,
action,
resource_type: resourceType,
resource_id: resourceId,
result,
ip_address: ipAddress,
session_id: sessionId,
reason,
changed_fields: changedFields,
prev_hash: prevHash || 'GENESIS',
};
entry.hash = this.computeHash(entry);
await this.db.auditLogs.create(entry);
return entry;
}
computeHash(entry) {
const hashable = JSON.stringify({
id: entry.id,
timestamp: entry.timestamp,
actor_id: entry.actor_id,
action: entry.action,
resource_type: entry.resource_type,
resource_id: entry.resource_id,
result: entry.result,
prev_hash: entry.prev_hash,
});
return crypto.createHash('sha256').update(hashable).digest('hex');
}
async getLastHash() {
const last = await this.db.auditLogs.findOne({ order: [['timestamp', 'DESC']] });
return last?.hash || null;
}
}
// Express middleware — auto-log all requests
const createAuditMiddleware = (auditLogger) => async (req, res, next) => {
const start = Date.now();
// Capture response
const originalSend = res.send;
res.send = function(body) {
res.send = originalSend;
// Log after response sent
setImmediate(async () => {
try {
await auditLogger.log({
actorId: req.user?.id || 'anonymous',
actorType: req.user ? 'user' : 'anonymous',
action: `${req.method.toLowerCase()}_${req.route?.path || req.path}`,
resourceType: req.params.resourceType || 'api_request',
resourceId: req.params.id || req.path,
result: res.statusCode < 400 ? 'success' :
res.statusCode === 403 ? 'denied' : 'failure',
ipAddress: req.ip,
sessionId: req.session?.id,
duration_ms: Date.now() - start,
});
} catch (err) {
console.error('Audit log failed:', err);
// Never let audit logging errors break the main flow
}
});
return originalSend.apply(this, arguments);
};
next();
};
-- Append-only audit log table
-- The CHECK constraint and trigger prevent UPDATE/DELETE
CREATE TABLE audit_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
actor_id TEXT NOT NULL,
actor_type TEXT NOT NULL CHECK (actor_type IN ('user', 'service', 'admin', 'system')),
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
result TEXT NOT NULL CHECK (result IN ('success', 'failure', 'denied')),
ip_address INET,
session_id TEXT,
reason TEXT,
changed_fields TEXT[],
prev_hash TEXT,
hash TEXT NOT NULL UNIQUE
);
-- Create append-only trigger — prevent modification
CREATE OR REPLACE FUNCTION prevent_audit_modification()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Audit logs are immutable — modification not allowed';
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER audit_logs_immutable
BEFORE UPDATE OR DELETE ON audit_logs
FOR EACH ROW EXECUTE FUNCTION prevent_audit_modification();
-- Indexes for common queries
CREATE INDEX idx_audit_actor_id ON audit_logs(actor_id);
CREATE INDEX idx_audit_resource ON audit_logs(resource_type, resource_id);
CREATE INDEX idx_audit_timestamp ON audit_logs(timestamp DESC);
CREATE INDEX idx_audit_action ON audit_logs(action);
-- Query: All actions by a user in the last 30 days
SELECT timestamp, action, resource_type, resource_id, result, ip_address
FROM audit_logs
WHERE actor_id = 'user-123'
AND timestamp > NOW() - INTERVAL '30 days'
ORDER BY timestamp DESC;
-- Query: All access to a specific patient record
SELECT timestamp, actor_id, action, result, ip_address, reason
FROM audit_logs
WHERE resource_type = 'patient_record'
AND resource_id = 'patient-456'
ORDER BY timestamp DESC;
-- Query: Failed access attempts (security monitoring)
SELECT actor_id, COUNT(*) as failed_attempts, MAX(timestamp) as last_attempt
FROM audit_logs
WHERE result IN ('failure', 'denied')
AND timestamp > NOW() - INTERVAL '24 hours'
GROUP BY actor_id
HAVING COUNT(*) > 5
ORDER BY failed_attempts DESC;
# Create CloudTrail that logs to immutable S3 bucket
aws cloudtrail create-trail \
--name compliance-audit-trail \
--s3-bucket-name my-audit-logs-bucket \
--include-global-service-events \
--is-multi-region-trail \
--enable-log-file-validation # Cryptographic log integrity
# Create S3 bucket with Object Lock (WORM - Write Once Read Many)
aws s3api create-bucket \
--bucket my-audit-logs-bucket \
--object-lock-enabled-for-bucket
# Set default retention: compliance mode (cannot delete even by admin)
aws s3api put-object-lock-configuration \
--bucket my-audit-logs-bucket \
--object-lock-configuration '{
"ObjectLockEnabled": "Enabled",
"Rule": {
"DefaultRetention": {
"Mode": "COMPLIANCE",
"Years": 6
}
}
}'
# Verify log file integrity (CloudTrail)
aws cloudtrail validate-logs \
--trail-arn arn:aws:cloudtrail:us-east-1:123456789:trail/compliance-audit-trail \
--start-time 2024-01-01T00:00:00Z
from fastapi import FastAPI, Request
import time
app = FastAPI()
@app.middleware("http")
async def audit_middleware(request: Request, call_next):
start_time = time.time()
# Extract context before processing
actor_id = None
if hasattr(request.state, 'user'):
actor_id = request.state.user.id
response = await call_next(request)
duration_ms = int((time.time() - start_time) * 1000)
# Determine result from status code
if response.status_code < 400:
result = "success"
elif response.status_code == 403:
result = "denied"
else:
result = "failure"
# Log asynchronously to avoid blocking response
import asyncio
asyncio.create_task(audit_logger.log(
actor_id=actor_id or "anonymous",
actor_type="user" if actor_id else "anonymous",
action=f"{request.method.lower()}:{request.url.path}",
resource_type="api_endpoint",
resource_id=str(request.url.path),
result=result,
ip_address=request.client.host,
duration_ms=duration_ms,
http_status=response.status_code,
))
return response
development
Expert guidance for Fireworks AI, the platform for running open-source LLMs (Llama, Mixtral, Qwen, etc.) with enterprise-grade speed and reliability. Helps developers integrate Fireworks' inference API, fine-tune models, and deploy custom model endpoints with function calling and structured output support.
development
Convert any website into clean, structured data with Firecrawl — API-first web scraping service. Use when someone asks to "turn a website into markdown", "scrape website for LLM", "Firecrawl", "extract website content as clean text", "crawl and convert to structured data", or "scrape website for RAG". Covers single-page scraping, full-site crawling, structured extraction, and LLM-ready output.
tools
Expert guidance for Firebase, Google's platform for building and scaling web and mobile applications. Helps developers set up authentication, Firestore/Realtime Database, Cloud Functions, hosting, storage, and analytics using Firebase's SDK and CLI.
development
When the user needs to build file upload functionality for a web application. Use when the user mentions "file upload," "image upload," "upload endpoint," "multipart upload," "presigned URL," "S3 upload," "file validation," "upload to cloud storage," or "accept user files." Handles upload endpoints, file validation (type, size, magic bytes), cloud storage integration, and upload status tracking. For image/video processing after upload, see media-transcoder.