skills/go-domain-events/SKILL.md
Go Domain Events 實作:事件定義、發布模式、Event Bus、Outbox Pattern、冪等處理、 Event Sourcing 基礎、非同步處理。 **適用場景**:DDD Domain Events、實作 Event Bus、Outbox Pattern、冪等性設計、 非同步事件處理、微服務溝通、Event Sourcing、解耦業務邏輯。
npx skillsauth add vincent119/ai-rules-kit go-domain-eventsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
相關 Skills:本規範建議搭配
go-ddd(領域驅動設計)與go-database(Outbox Pattern)
| 類型 | 時態 | 範例 | 可拒絕 | |--------|--------|---------------------------|--------| | 命令 | 未來式 | CreateUser、PlaceOrder | 是 | | 事件 | 過去式 | UserCreated、OrderPlaced | 否 |
package domain
import (
"time"
"github.com/google/uuid"
)
// DomainEvent 介面
type DomainEvent interface {
EventID() string
EventType() string
AggregateID() string
OccurredAt() time.Time
}
// BaseEvent 基礎實作
type BaseEvent struct {
ID string `json:"id"`
Type string `json:"type"`
AggrID string `json:"aggregate_id"`
Timestamp time.Time `json:"occurred_at"`
Version int `json:"version"`
}
func NewBaseEvent(eventType string, aggregateID string, version int) BaseEvent {
return BaseEvent{
ID: uuid.New().String(),
Type: eventType,
AggrID: aggregateID,
Timestamp: time.Now(),
Version: version,
}
}
func (e BaseEvent) EventID() string { return e.ID }
func (e BaseEvent) EventType() string { return e.Type }
func (e BaseEvent) AggregateID() string { return e.AggrID }
func (e BaseEvent) OccurredAt() time.Time { return e.Timestamp }
// UserCreated 事件
type UserCreated struct {
BaseEvent
Email string `json:"email"`
Name string `json:"name"`
}
func NewUserCreated(userID string, email string, name string) *UserCreated {
return &UserCreated{
BaseEvent: NewBaseEvent("UserCreated", userID, 1),
Email: email,
Name: name,
}
}
// OrderPlaced 事件
type OrderPlaced struct {
BaseEvent
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
}
func NewOrderPlaced(orderID string, customerID string, amount float64) *OrderPlaced {
return &OrderPlaced{
BaseEvent: NewBaseEvent("OrderPlaced", orderID, 1),
OrderID: orderID,
CustomerID: customerID,
Amount: amount,
}
}
package eventbus
import (
"context"
"sync"
)
type EventHandler func(ctx context.Context, event DomainEvent) error
type EventBus struct {
mu sync.RWMutex
handlers map[string][]EventHandler
}
func NewEventBus() *EventBus {
return &EventBus{
handlers: make(map[string][]EventHandler),
}
}
// Subscribe 註冊事件處理器
func (eb *EventBus) Subscribe(eventType string, handler EventHandler) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.handlers[eventType] = append(eb.handlers[eventType], handler)
}
// Publish 發布事件
func (eb *EventBus) Publish(ctx context.Context, event DomainEvent) error {
eb.mu.RLock()
handlers := eb.handlers[event.EventType()]
eb.mu.RUnlock()
for _, handler := range handlers {
// 同步處理(簡單場景)
if err := handler(ctx, event); err != nil {
return fmt.Errorf("handler failed: %w", err)
}
}
return nil
}
func main() {
bus := eventbus.NewEventBus()
// 註冊 Handler
bus.Subscribe("UserCreated", func(ctx context.Context, event DomainEvent) error {
e := event.(*UserCreated)
log.Printf("User created: %s (%s)", e.Name, e.Email)
return nil
})
bus.Subscribe("UserCreated", func(ctx context.Context, event DomainEvent) error {
e := event.(*UserCreated)
// 發送歡迎信
return emailService.SendWelcomeEmail(e.Email)
})
// 發布事件
event := NewUserCreated("123", "[email protected]", "John")
if err := bus.Publish(context.Background(), event); err != nil {
log.Fatal(err)
}
}
問題:資料庫事務與訊息發布不是原子操作
// ❌ 問題:若 Publish 失敗,使用者已建立但事件未發出
tx.Exec("INSERT INTO users ...")
tx.Commit()
eventBus.Publish(userCreatedEvent) // 可能失敗
解決方案:將事件寫入資料庫(Outbox Table),再由後台程序發送
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
retry_count INT DEFAULT 0,
error TEXT
);
CREATE INDEX idx_outbox_events_unpublished
ON outbox_events (occurred_at)
WHERE published_at IS NULL;
func (r *UserRepository) Create(ctx context.Context, tx *sql.Tx, user *User) error {
// 1. 插入 User
_, err := tx.ExecContext(ctx, `
INSERT INTO users (id, email, name) VALUES ($1, $2, $3)
`, user.ID, user.Email, user.Name)
if err != nil {
return fmt.Errorf("insert user: %w", err)
}
// 2. 寫入 Outbox Event(同一個 Transaction)
event := NewUserCreated(user.ID, user.Email, user.Name)
payload, _ := json.Marshal(event)
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (event_type, aggregate_id, payload, occurred_at)
VALUES ($1, $2, $3, $4)
`, event.EventType(), event.AggregateID(), payload, event.OccurredAt())
if err != nil {
return fmt.Errorf("insert outbox: %w", err)
}
return nil
}
type OutboxPublisher struct {
db *sql.DB
publisher EventPublisher // 例如 Kafka、RabbitMQ
logger *zap.Logger
}
func (op *OutboxPublisher) Run(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := op.processOutbox(ctx); err != nil {
op.logger.Error("process outbox failed", zap.Error(err))
}
}
}
}
func (op *OutboxPublisher) processOutbox(ctx context.Context) error {
// 1. 查詢未發布的事件(限制筆數避免記憶體爆炸)
rows, err := op.db.QueryContext(ctx, `
SELECT id, event_type, payload
FROM outbox_events
WHERE published_at IS NULL
ORDER BY occurred_at
LIMIT 100
FOR UPDATE SKIP LOCKED
`)
if err != nil {
return fmt.Errorf("query outbox: %w", err)
}
defer rows.Close()
for rows.Next() {
var id string
var eventType string
var payload []byte
if err := rows.Scan(&id, &eventType, &payload); err != nil {
return fmt.Errorf("scan row: %w", err)
}
// 2. 發布事件到 Message Queue
if err := op.publisher.Publish(ctx, eventType, payload); err != nil {
// 更新重試次數與錯誤訊息
op.db.ExecContext(ctx, `
UPDATE outbox_events
SET retry_count = retry_count + 1, error = $1
WHERE id = $2
`, err.Error(), id)
continue
}
// 3. 標記為已發布
_, err := op.db.ExecContext(ctx, `
UPDATE outbox_events SET published_at = NOW() WHERE id = $1
`, id)
if err != nil {
return fmt.Errorf("update outbox: %w", err)
}
}
return nil
}
問題:同一事件可能被處理多次(網路重試、系統故障)
// Event 包含冪等性 Key
type UserCreated struct {
BaseEvent
Email string `json:"email"`
Name string `json:"name"`
}
func (e *UserCreated) IdempotencyKey() string {
// 使用 EventID 作為冪等性 Key
return e.EventID()
}
CREATE TABLE processed_events (
event_id VARCHAR(100) PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_processed_events_processed_at ON processed_events (processed_at);
func (h *UserEventHandler) Handle(ctx context.Context, event *UserCreated) error {
// 1. 檢查是否已處理
var exists bool
err := h.db.QueryRowContext(ctx, `
SELECT EXISTS(SELECT 1 FROM processed_events WHERE event_id = $1)
`, event.EventID()).Scan(&exists)
if err != nil {
return fmt.Errorf("check processed: %w", err)
}
if exists {
h.logger.Info("event already processed", zap.String("event_id", event.EventID()))
return nil // 冪等:直接返回成功
}
// 2. 處理事件
tx, err := h.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
// 業務邏輯
if err := h.sendWelcomeEmail(ctx, event.Email); err != nil {
return fmt.Errorf("send email: %w", err)
}
// 3. 記錄為已處理(同一個 Transaction)
_, err = tx.ExecContext(ctx, `
INSERT INTO processed_events (event_id) VALUES ($1)
`, event.EventID())
if err != nil {
return fmt.Errorf("insert processed: %w", err)
}
return tx.Commit()
}
Event Sourcing:不儲存當前狀態,而是儲存所有歷史事件,透過重播事件重建狀態
CREATE TABLE event_store (
sequence_number BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_event_store_stream_id ON event_store (stream_id, sequence_number);
func (es *EventStore) Append(ctx context.Context, streamID string, event DomainEvent) error {
payload, _ := json.Marshal(event)
_, err := es.db.ExecContext(ctx, `
INSERT INTO event_store (stream_id, event_type, event_data, occurred_at)
VALUES ($1, $2, $3, $4)
`, streamID, event.EventType(), payload, event.OccurredAt())
return err
}
func (es *EventStore) Replay(ctx context.Context, streamID string) ([]DomainEvent, error) {
rows, err := es.db.QueryContext(ctx, `
SELECT event_type, event_data
FROM event_store
WHERE stream_id = $1
ORDER BY sequence_number
`, streamID)
if err != nil {
return nil, err
}
defer rows.Close()
var events []DomainEvent
for rows.Next() {
var eventType string
var payload []byte
if err := rows.Scan(&eventType, &payload); err != nil {
return nil, err
}
// 反序列化事件(需要 Event Registry)
event, err := deserializeEvent(eventType, payload)
if err != nil {
return nil, err
}
events = append(events, event)
}
return events, nil
}
事件定義
Event Bus
Outbox Pattern
FOR UPDATE SKIP LOCKED 避免競爭冪等性
Event Sourcing
測試
tools
基於 SLA/SLO 量化評估事故影響的計算模型與業務影響矩陣。適用於「SLA 影響」、「SLO 違反」、「影響評估」、「營收損失估算」、「Error Budget」、「可用性計算」、「事故成本評估」等量化事故業務影響的任務。強化 impact-assessor 的評估能力。注意:事故原因分析與改善規劃不在此技能範圍內。
research
根因分析(RCA)方法論詳細指南。提供 5 Whys、Fishbone 圖、Fault Tree Analysis、變更分析等結構化 RCA 技術,以及認知偏誤防範清單。適用於「根因分析」、「RCA」、「5 Whys」、「魚骨圖」、「Fault Tree」、「原因分析方法論」、「變更分析」等事故原因分析任務。強化 root-cause-investigator 的分析能力。注意:時間軸重建與改善規劃不在此技能範圍內。
testing
事故事後分析(Postmortem)完整流程。協調 7 個執行階段:資訊收集 → 時間軸重建 → 根因分析 → 影響評估 → 改善規劃 → 報告審查 → 整合報告,最終產出完整的 Postmortem 報告。適用於「寫事故報告」、「post-incident 分析」、「RCA 報告」、「事故時間軸整理」、「建立改善措施」等請求。注意:即時 Incident Response(on-call)、監控系統設定、告警配置不在此技能範圍內。
content-media
投影片版面模式庫。提供 20 種投影片類型的最佳版面配置、格線系統、色彩與字型設計 Token。適用於「投影片版面」、「Slide Layout」、「設計系統」、「格線」、「字型」、「色彩規範」等投影片視覺設計任務。強化 visual-designer 的設計能力。注意:PPT/Keynote 檔案直接輸出不在此技能範圍內。