dot_claude/skills/data-pipeline-engineering/SKILL.md
Use when building or modifying any data pipeline, before writing transformation logic - idempotent-first approach covering schema design, quality checks, incremental loads, CDC, and observability that ensures every step is repeatable and verifiable | データパイプラインの構築や変更時、変換ロジックを書く前に使用 - スキーマ設計、品質チェック、増分ロード、CDC、オブザーバビリティを網羅する冪等性ファースト手法により、全ステップの再実行可能性と検証可能性を保証
npx skillsauth add lv416e/dotfiles data-pipeline-engineeringInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Data pipelines fail silently. Bad data propagates downstream before anyone notices. Reprocessing a non-idempotent pipeline corrupts your warehouse.
Core principle: EVERY PIPELINE STEP IS IDEMPOTENT AND PRODUCES VERIFIABLE OUTPUT. If you can't safely re-run any step at any time, your pipeline is a ticking bomb.
Violating the letter of this process is violating the spirit of data engineering.
EVERY PIPELINE STEP IS IDEMPOTENT AND PRODUCES VERIFIABLE OUTPUT
If re-running a step produces different results or duplicates data, you cannot ship it.
Always:
Use this ESPECIALLY when:
Don't skip when:
BEFORE writing any transformation logic:
Understand the Source
Choose Your Modeling Approach
| Pattern | Use When | Characteristics | |---------|----------|-----------------| | Star schema | BI/analytics, known query patterns | Denormalized facts + dimensions, fast reads | | Snowflake schema | Normalized dimensions needed | Normalized dimensions, less storage | | Data vault | Multiple sources, audit requirements | Hubs + links + satellites, full history | | Wide tables | Simple analytics, small data | Single denormalized table, easy queries |
Define Layers
raw/staging → Exact copy of source (no transforms)
cleaned → Type casting, dedup, null handling
transformed → Business logic, joins, aggregations
presentation → Consumer-ready marts and views
Each layer has a purpose. Don't skip layers. Don't combine them.
Design for Change
This is non-negotiable. Every step must be safely re-runnable.
DELETE-INSERT (simplest, preferred for small-medium data):
-- Idempotent: safe to re-run for any date
DELETE FROM orders_fact WHERE order_date = '{{ ds }}';
INSERT INTO orders_fact
SELECT ... FROM staging_orders WHERE order_date = '{{ ds }}';
MERGE/UPSERT (for large datasets where delete-insert is too slow):
MERGE INTO dim_customers AS target
USING staging_customers AS source
ON target.business_key = source.customer_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...;
TABLE SWAP (for full rebuilds):
CREATE TABLE orders_fact_new AS SELECT ...;
-- Atomic swap
ALTER TABLE orders_fact RENAME TO orders_fact_old;
ALTER TABLE orders_fact_new RENAME TO orders_fact;
DROP TABLE orders_fact_old;
| Anti-Pattern | Why It Breaks | Fix |
|-------------|---------------|-----|
| INSERT without dedup check | Duplicates on re-run | DELETE-INSERT or MERGE |
| Auto-increment surrogate keys | Different IDs on re-run | Hash-based or natural keys for matching |
| NOW() in transforms | Different results on re-run | Pass execution timestamp as parameter |
| Appending to files | Duplicates on re-run | Write to partitioned paths, overwrite partition |
| Sequence-dependent operations | Order matters across runs | Make each step independent |
Test idempotency explicitly: Run the pipeline twice for the same input. Output must be identical.
Default to incremental. Fall back to full only when necessary.
Load only what changed since last run.
Requirements:
updated_at, sequence number, CDC log# Watermark-based incremental
last_watermark = get_watermark('orders_pipeline')
new_data = extract(f"SELECT * FROM orders WHERE updated_at > '{last_watermark}'")
load(new_data)
set_watermark('orders_pipeline', new_data.max('updated_at'))
WARNING: If the source doesn't have a reliable updated_at, you CANNOT do incremental safely. Use CDC or full load.
Use CDC when:
CDC approaches:
| Method | Latency | Source Impact | Complexity | |--------|---------|--------------|------------| | Log-based (Debezium, DMS) | Seconds | None | Medium | | Trigger-based | Seconds | High | High | | Timestamp-based polling | Minutes | Low | Low | | Diff/compare | Hours | High | Low |
Prefer log-based CDC. It captures all changes (including deletes) without impacting source performance.
Use full load ONLY when:
Even with full load, make it idempotent: table swap or delete-insert.
Every pipeline step produces verifiable output. No exceptions.
Source checks → Before extraction: Is source available? Schema matches?
Load checks → After extraction: Row counts, null rates, type conformance
Business rules → After transform: Domain constraints, referential integrity
Output checks → Before serving: Completeness, freshness, accuracy
ALWAYS implement these. Non-negotiable:
| Check | Example | Action on Failure |
|-------|---------|-------------------|
| Not null | id IS NOT NULL | Block pipeline |
| Unique | COUNT(id) = COUNT(DISTINCT id) | Block pipeline |
| Referential integrity | All order.customer_id exist in dim_customers | Block or quarantine |
| Row count threshold | Today's count within 20% of yesterday | Alert, review |
| Freshness | Source data within expected window | Alert, review |
| Accepted values | status IN ('active', 'inactive', 'pending') | Quarantine bad rows |
| Value range | price > 0 AND price < 1000000 | Quarantine bad rows |
# schema.yml
models:
- name: orders_fact
columns:
- name: order_id
tests:
- not_null
- unique
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_total
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("order_total", min_value=0, max_value=1000000)
validator.expect_column_pair_values_a_to_be_greater_than_b("ship_date", "order_date", or_equal=True)
Block: Pipeline stops. Data does not propagate. Fix required.
Quarantine: Bad rows redirected to error table. Good rows continue.
Alert: Pipeline continues. Human reviews.
NEVER silently drop bad data. Every rejected row must be accounted for.
Pipelines fail. Design for it.
# Exponential backoff with jitter
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60) + wait_random(0, 2),
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
before_sleep=log_retry_attempt
)
def extract_from_source(query):
...
Retryable failures:
Retry-After)Non-retryable failures (fix the code):
for partition_date in date_range:
try:
process_partition(partition_date)
mark_success(partition_date)
except RetryableError:
mark_failed(partition_date)
continue # Next partition
except FatalError:
mark_failed(partition_date)
alert_and_stop()
break
For streaming pipelines, route unparseable/invalid messages to a dead letter queue:
If you can't trace where data came from, you can't trust it.
Every output table must answer:
Column-level lineage with dbt:
-- models/orders_fact.sql
-- depends_on: {{ ref('stg_orders') }}, {{ ref('dim_customers') }}
SELECT
o.order_id, -- from stg_orders.order_id
o.order_date, -- from stg_orders.order_date
c.customer_name, -- from dim_customers.customer_name
o.quantity * o.price AS order_total -- derived
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
Run metadata:
-- Add to every output table
_pipeline_name VARCHAR, -- 'orders_daily'
_pipeline_run_id VARCHAR, -- 'run-2024-01-15-001'
_pipeline_version VARCHAR, -- 'v2.3.1'
_loaded_at TIMESTAMP -- pipeline execution time (parameter, not NOW())
Monitor these. Alert on anomalies.
| Metric | What It Tells You | |--------|------------------| | Row count delta | Sudden spikes or drops = source issue or bug | | Pipeline duration | Increasing duration = scaling problem | | Error rate | Rising errors = source degradation | | Data freshness | Stale data = pipeline stuck or source delayed | | Quality check pass rate | Declining = source quality degrading | | DLQ size (streaming) | Growing = parsing/schema issues |
# Explicit dependencies, not implicit
extract_orders >> validate_source >> transform_orders >> quality_checks >> load_mart
# NOT this
extract_orders >> transform_orders >> load_mart # Missing validation!
Rules for DAGs:
If you catch yourself:
NOW() inside a transformationALL of these mean: STOP. Return to Phase 1.
| Excuse | Reality | |--------|---------| | "One-time load, doesn't need idempotency" | One-time loads always run again. Build it right. | | "Source data is clean" | Profile it. It isn't. Data quality issues always exist. | | "Quality checks slow the pipeline down" | Silent bad data costs more than pipeline latency. | | "We'll add lineage tracking later" | Later never comes. Lineage is foundational, not optional. | | "Full load is simpler" | Full load doesn't scale. Design incremental from the start. | | "Small dataset, don't need partitioning" | Small datasets grow. Partition early or redesign later. | | "Just an internal dashboard" | Internal consumers deserve correct data too. | | "CDC is overkill" | Missing deletes and updates is data corruption. Evaluate CDC honestly. | | "Schema won't change" | Schemas always change. Design for evolution. | | "We can fix data manually" | Manual fixes don't scale and aren't auditable. Automate. |
| Phase | Key Activities | Success Criteria | |-------|---------------|------------------| | 1. Schema Design | Profile source, choose model, define layers | Documented schema, clear layers | | 2. Idempotency | DELETE-INSERT/MERGE, parameterized timestamps | Re-run produces identical output | | 3. Load Strategy | Incremental, CDC, or full with justification | Efficient, handles late arrivals | | 4. Quality Checks | Null, unique, referential, range, freshness | Every step has verifiable checks | | 5. Error Handling | Retry, partition, dead letter, alerting | Failures are handled, not silent | | 6. Lineage | Source tracking, run metadata, observability | Every output traceable to source |
Before deploying any pipeline:
Can't check all boxes? You're not ready to deploy.
This skill integrates with:
Complementary skills:
Profile source → design schema → implement idempotent steps → verify quality at every boundary
Otherwise → not data engineering
No pipeline ships without idempotency and quality checks. No exceptions without your human partner's permission.
development
Use this skill any time a spreadsheet file is the primary input or output. This means any task where the user wants to: open, read, edit, or fix an existing .xlsx, .xlsm, .csv, or .tsv file (e.g., adding columns, computing formulas, formatting, charting, cleaning messy data); create a new spreadsheet from scratch or from other data sources; or convert between tabular file formats. Trigger especially when the user references a spreadsheet file by name or path — even casually (like "the xlsx in my downloads") — and wants something done to it or produced from it. Also trigger for cleaning or restructuring messy tabular data files (malformed rows, misplaced headers, junk data) into proper spreadsheets. The deliverable must be a spreadsheet file. Do NOT trigger when the primary deliverable is a Word document, HTML report, standalone Python script, database pipeline, or Google Sheets API integration, even if tabular data is involved.
testing
Use when creating new skills, editing existing skills, or verifying skills work before deployment - applies TDD to process documentation by testing with subagents before writing, iterating until bulletproof against rationalization | 新しいスキルの作成、既存スキルの編集、またはデプロイ前にスキルが機能するか検証する際に使用 - プロセスドキュメントにTDDを適用し、記述前にサブエージェントでテストし、合理化に対して堅牢になるまで反復
development
Use when design is complete and you need detailed implementation tasks for engineers with zero codebase context - creates comprehensive implementation plans with exact file paths, complete code examples, and verification steps assuming engineer has minimal domain knowledge | 設計が完了し、コードベースの知識がゼロのエンジニア向けに詳細な実装タスクが必要な場合に使用 - 正確なファイルパス、完全なコード例、検証ステップを含む包括的な実装計画を作成。エンジニアの領域知識が最小限であることを前提
tools
Toolkit for interacting with and testing local web applications using Playwright. Supports verifying frontend functionality, debugging UI behavior, capturing browser screenshots, and viewing browser logs.