agents/plugins/backend-development/skills/projection-patterns/SKILL.md
Build read models and projections from event streams. Use when implementing CQRS read sides, building materialized views, or optimizing query performance in event-sourced systems.
npx skillsauth add simplysmartai/5cypressautomation projection-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Comprehensive guide to building projections and read models for event-sourced systems.
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Event Store │────►│ Projector │────►│ Read Model │
│ │ │ │ │ (Database) │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Events │ │ │ │ Handler │ │ │ │ Tables │ │
│ └─────────┘ │ │ │ Logic │ │ │ │ Views │ │
│ │ │ └─────────┘ │ │ │ Cache │ │
└─────────────┘ └─────────────┘ └─────────────┘
| Type | Description | Use Case | |------|-------------|----------| | Live | Real-time from subscription | Current state queries | | Catchup | Process historical events | Rebuilding read models | | Persistent | Stores checkpoint | Resume after restart | | Inline | Same transaction as write | Strong consistency |
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Any, Callable, List
import asyncpg
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
version: int
global_position: int
class Projection(ABC):
"""Base class for projections."""
@property
@abstractmethod
def name(self) -> str:
"""Unique projection name for checkpointing."""
pass
@abstractmethod
def handles(self) -> List[str]:
"""List of event types this projection handles."""
pass
@abstractmethod
async def apply(self, event: Event) -> None:
"""Apply event to the read model."""
pass
class Projector:
"""Runs projections from event store."""
def __init__(self, event_store, checkpoint_store):
self.event_store = event_store
self.checkpoint_store = checkpoint_store
self.projections: List[Projection] = []
def register(self, projection: Projection):
self.projections.append(projection)
async def run(self, batch_size: int = 100):
"""Run all projections continuously."""
while True:
for projection in self.projections:
await self._run_projection(projection, batch_size)
await asyncio.sleep(0.1)
async def _run_projection(self, projection: Projection, batch_size: int):
checkpoint = await self.checkpoint_store.get(projection.name)
position = checkpoint or 0
events = await self.event_store.read_all(position, batch_size)
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self.checkpoint_store.save(
projection.name,
event.global_position
)
async def rebuild(self, projection: Projection):
"""Rebuild a projection from scratch."""
await self.checkpoint_store.delete(projection.name)
# Optionally clear read model tables
await self._run_projection(projection, batch_size=1000)
class OrderSummaryProjection(Projection):
"""Projects order events to a summary read model."""
def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool
@property
def name(self) -> str:
return "order_summary"
def handles(self) -> List[str]:
return [
"OrderCreated",
"OrderItemAdded",
"OrderItemRemoved",
"OrderShipped",
"OrderCompleted",
"OrderCancelled"
]
async def apply(self, event: Event) -> None:
handlers = {
"OrderCreated": self._handle_created,
"OrderItemAdded": self._handle_item_added,
"OrderItemRemoved": self._handle_item_removed,
"OrderShipped": self._handle_shipped,
"OrderCompleted": self._handle_completed,
"OrderCancelled": self._handle_cancelled,
}
handler = handlers.get(event.event_type)
if handler:
await handler(event)
async def _handle_created(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO order_summaries
(order_id, customer_id, status, total_amount, item_count, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
""",
event.data['order_id'],
event.data['customer_id'],
'pending',
0,
0,
event.data['created_at']
)
async def _handle_item_added(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET total_amount = total_amount + $2,
item_count = item_count + 1,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['price'] * event.data['quantity']
)
async def _handle_item_removed(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET total_amount = total_amount - $2,
item_count = item_count - 1,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['price'] * event.data['quantity']
)
async def _handle_shipped(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'shipped',
shipped_at = $2,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['shipped_at']
)
async def _handle_completed(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'completed',
completed_at = $2,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['completed_at']
)
async def _handle_cancelled(self, event: Event):
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE order_summaries
SET status = 'cancelled',
cancelled_at = $2,
cancellation_reason = $3,
updated_at = NOW()
WHERE order_id = $1
""",
event.data['order_id'],
event.data['cancelled_at'],
event.data.get('reason')
)
from elasticsearch import AsyncElasticsearch
class ProductSearchProjection(Projection):
"""Projects product events to Elasticsearch for full-text search."""
def __init__(self, es_client: AsyncElasticsearch):
self.es = es_client
self.index = "products"
@property
def name(self) -> str:
return "product_search"
def handles(self) -> List[str]:
return [
"ProductCreated",
"ProductUpdated",
"ProductPriceChanged",
"ProductDeleted"
]
async def apply(self, event: Event) -> None:
if event.event_type == "ProductCreated":
await self.es.index(
index=self.index,
id=event.data['product_id'],
document={
'name': event.data['name'],
'description': event.data['description'],
'category': event.data['category'],
'price': event.data['price'],
'tags': event.data.get('tags', []),
'created_at': event.data['created_at']
}
)
elif event.event_type == "ProductUpdated":
await self.es.update(
index=self.index,
id=event.data['product_id'],
doc={
'name': event.data['name'],
'description': event.data['description'],
'category': event.data['category'],
'tags': event.data.get('tags', []),
'updated_at': event.data['updated_at']
}
)
elif event.event_type == "ProductPriceChanged":
await self.es.update(
index=self.index,
id=event.data['product_id'],
doc={
'price': event.data['new_price'],
'price_updated_at': event.data['changed_at']
}
)
elif event.event_type == "ProductDeleted":
await self.es.delete(
index=self.index,
id=event.data['product_id']
)
class DailySalesProjection(Projection):
"""Aggregates sales data by day for reporting."""
def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool
@property
def name(self) -> str:
return "daily_sales"
def handles(self) -> List[str]:
return ["OrderCompleted", "OrderRefunded"]
async def apply(self, event: Event) -> None:
if event.event_type == "OrderCompleted":
await self._increment_sales(event)
elif event.event_type == "OrderRefunded":
await self._decrement_sales(event)
async def _increment_sales(self, event: Event):
date = event.data['completed_at'][:10] # YYYY-MM-DD
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO daily_sales (date, total_orders, total_revenue, total_items)
VALUES ($1, 1, $2, $3)
ON CONFLICT (date) DO UPDATE SET
total_orders = daily_sales.total_orders + 1,
total_revenue = daily_sales.total_revenue + $2,
total_items = daily_sales.total_items + $3,
updated_at = NOW()
""",
date,
event.data['total_amount'],
event.data['item_count']
)
async def _decrement_sales(self, event: Event):
date = event.data['original_completed_at'][:10]
async with self.pool.acquire() as conn:
await conn.execute(
"""
UPDATE daily_sales SET
total_orders = total_orders - 1,
total_revenue = total_revenue - $2,
total_refunds = total_refunds + $2,
updated_at = NOW()
WHERE date = $1
""",
date,
event.data['refund_amount']
)
class CustomerActivityProjection(Projection):
"""Projects customer activity across multiple tables."""
def __init__(self, db_pool: asyncpg.Pool):
self.pool = db_pool
@property
def name(self) -> str:
return "customer_activity"
def handles(self) -> List[str]:
return [
"CustomerCreated",
"OrderCompleted",
"ReviewSubmitted",
"CustomerTierChanged"
]
async def apply(self, event: Event) -> None:
async with self.pool.acquire() as conn:
async with conn.transaction():
if event.event_type == "CustomerCreated":
# Insert into customers table
await conn.execute(
"""
INSERT INTO customers (customer_id, email, name, tier, created_at)
VALUES ($1, $2, $3, 'bronze', $4)
""",
event.data['customer_id'],
event.data['email'],
event.data['name'],
event.data['created_at']
)
# Initialize activity summary
await conn.execute(
"""
INSERT INTO customer_activity_summary
(customer_id, total_orders, total_spent, total_reviews)
VALUES ($1, 0, 0, 0)
""",
event.data['customer_id']
)
elif event.event_type == "OrderCompleted":
# Update activity summary
await conn.execute(
"""
UPDATE customer_activity_summary SET
total_orders = total_orders + 1,
total_spent = total_spent + $2,
last_order_at = $3
WHERE customer_id = $1
""",
event.data['customer_id'],
event.data['total_amount'],
event.data['completed_at']
)
# Insert into order history
await conn.execute(
"""
INSERT INTO customer_order_history
(customer_id, order_id, amount, completed_at)
VALUES ($1, $2, $3, $4)
""",
event.data['customer_id'],
event.data['order_id'],
event.data['total_amount'],
event.data['completed_at']
)
elif event.event_type == "ReviewSubmitted":
await conn.execute(
"""
UPDATE customer_activity_summary SET
total_reviews = total_reviews + 1,
last_review_at = $2
WHERE customer_id = $1
""",
event.data['customer_id'],
event.data['submitted_at']
)
elif event.event_type == "CustomerTierChanged":
await conn.execute(
"""
UPDATE customers SET tier = $2, updated_at = NOW()
WHERE customer_id = $1
""",
event.data['customer_id'],
event.data['new_tier']
)
content-media
This skill should be used when the user asks to "plan team structure", "determine hiring needs", "design org chart", "calculate compensation", "plan equity allocation", or requests organizational design and headcount planning for a startup.
development
This skill should be used when the user asks about "key startup metrics", "SaaS metrics", "CAC and LTV", "unit economics", "burn multiple", "rule of 40", "marketplace metrics", or requests guidance on tracking and optimizing business performance metrics.
development
This skill should be used when the user asks to "create financial projections", "build a financial model", "forecast revenue", "calculate burn rate", "estimate runway", "model cash flow", or requests 3-5 year financial planning for a startup.
research
This skill should be used when the user asks to "calculate TAM", "determine SAM", "estimate SOM", "size the market", "calculate market opportunity", "what's the total addressable market", or requests market sizing analysis for a startup or business opportunity.