.agents/skills/event-sourcing-cqrs/SKILL.md
Event Sourcing and CQRS patterns. Event stores, projections, snapshots, command handlers, query models, and eventual consistency. Covers EventStoreDB, Axon Framework, and custom implementations. USE WHEN: user mentions "event sourcing", "CQRS", "event store", "projection", "command handler", "read model", "write model", "EventStoreDB", "Axon" DO NOT USE FOR: simple event-driven architecture - use `event-driven`; message brokers - use messaging skills; DDD basics - use `ddd`
npx skillsauth add d-subrahmanyam/deno-fresh-microservices event-sourcing-cqrsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Command ──▶ Command Handler ──▶ Aggregate ──▶ Events ──▶ Event Store
│
┌──────┘
▼
Projections ──▶ Read Models ──▶ Queries
interface DomainEvent {
eventId: string;
aggregateId: string;
type: string;
data: unknown;
version: number;
timestamp: Date;
}
class EventStore {
async append(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void> {
// Optimistic concurrency: check current version matches expected
const current = await this.getCurrentVersion(aggregateId);
if (current !== expectedVersion) throw new ConcurrencyError(aggregateId);
await db.events.createMany({
data: events.map((e, i) => ({ ...e, version: expectedVersion + i + 1 })),
});
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
return db.events.findMany({
where: { aggregateId },
orderBy: { version: 'asc' },
});
}
}
class OrderAggregate {
private id!: string;
private status!: string;
private items: OrderItem[] = [];
private version = 0;
private uncommittedEvents: DomainEvent[] = [];
static fromHistory(events: DomainEvent[]): OrderAggregate {
const aggregate = new OrderAggregate();
events.forEach((e) => aggregate.apply(e, false));
return aggregate;
}
createOrder(id: string, items: OrderItem[]) {
this.apply({ type: 'OrderCreated', data: { id, items } }, true);
}
confirmOrder() {
if (this.status !== 'pending') throw new DomainError('Cannot confirm');
this.apply({ type: 'OrderConfirmed', data: { id: this.id } }, true);
}
private apply(event: Partial<DomainEvent>, isNew: boolean) {
switch (event.type) {
case 'OrderCreated':
this.id = event.data.id;
this.items = event.data.items;
this.status = 'pending';
break;
case 'OrderConfirmed':
this.status = 'confirmed';
break;
}
this.version++;
if (isNew) this.uncommittedEvents.push(event as DomainEvent);
}
}
class ConfirmOrderHandler {
constructor(private eventStore: EventStore) {}
async handle(cmd: ConfirmOrderCommand): Promise<void> {
const events = await this.eventStore.getEvents(cmd.orderId);
const aggregate = OrderAggregate.fromHistory(events);
aggregate.confirmOrder();
await this.eventStore.append(
cmd.orderId,
aggregate.uncommittedEvents,
aggregate.version - aggregate.uncommittedEvents.length,
);
}
}
class OrderProjection {
async handle(event: DomainEvent): Promise<void> {
switch (event.type) {
case 'OrderCreated':
await readDb.orders.create({
data: { id: event.data.id, status: 'pending', total: calcTotal(event.data.items) },
});
break;
case 'OrderConfirmed':
await readDb.orders.update({
where: { id: event.aggregateId },
data: { status: 'confirmed', confirmedAt: event.timestamp },
});
break;
}
}
}
async function loadAggregate(id: string): Promise<OrderAggregate> {
const snapshot = await snapshotStore.getLatest(id);
const events = await eventStore.getEvents(id, snapshot?.version ?? 0);
const aggregate = snapshot
? OrderAggregate.fromSnapshot(snapshot)
: new OrderAggregate();
events.forEach((e) => aggregate.apply(e, false));
return aggregate;
}
| Anti-Pattern | Fix | |--------------|-----| | Querying event store for reads | Build read models via projections | | No optimistic concurrency | Check expected version on append | | Deleting events | Events are immutable; use compensating events | | Projections coupled to write model | Projections consume events independently | | No snapshots for long streams | Snapshot every N events (e.g., 100) |
development
Guidelines for building high-performance APIs with Fastify and TypeScript, covering validation, Prisma integration, and testing best practices
development
FastAPI modern Python web framework. Covers routing, Pydantic models, dependency injection, and async support. Use when building Python APIs. USE WHEN: user mentions "fastapi", "pydantic", "async python api", "python rest api", asks about "dependency injection python", "python openapi", "python swagger", "async endpoints", "python api validation", "fastapi middleware" DO NOT USE FOR: Django apps - use `django` instead, Flask apps - use `flask` instead, synchronous Python APIs without type hints, GraphQL-only APIs
tools
FastAPI integration testing specialist. Covers synchronous TestClient, async httpx AsyncClient, dependency injection overrides, auth testing (JWT, OAuth2, API keys), WebSocket testing, file uploads, background tasks, middleware testing, and HTTP mocking with respx, responses, and pytest-httpserver. USE WHEN: user mentions "FastAPI test", "TestClient", "httpx async test", "dependency override test", "respx mock", asks about testing FastAPI endpoints, authentication in tests, or HTTP client mocking. DO NOT USE FOR: Django - use `pytest-django`; pytest internals - use `pytest`; Container infrastructure - use `testcontainers-python`
development
Expert in FastAPI Python development with best practices for APIs and async operations