skills/data-pipeline-spec/SKILL.md
Design an ETL/ELT data pipeline specification. Use when asked to design a data pipeline, spec an ETL or ELT process, document a data ingestion workflow, or plan a data integration. Produces a complete pipeline spec with sources, transforms, destinations, SLAs, error handling, and data quality rules.
npx skillsauth add mohitagw15856/pm-claude-skills data-pipeline-specInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
This skill produces a complete data pipeline specification covering sources, transformations, destinations, scheduling, SLAs, error handling, data quality checks, and monitoring requirements. Output is ready for engineering handoff or architecture review.
Ask the user for these if not provided:
Purpose: [One sentence — what decision or workflow does this pipeline enable?] Type: [ETL / ELT / Streaming / Batch] Owner: [Team or individual] Version: [1.0] Date: [Date] Status: [Draft / Under Review / Approved]
[2–3 sentences describing the pipeline end-to-end: what data moves, from where to where, at what cadence, and why.]
Architecture diagram (text):
[Source A] ──┐
[Source B] ──┤──► [Ingestion Layer] ──► [Transform Layer] ──► [Destination] ──► [Consumers]
[Source C] ──┘
| Source | System | Connection type | Data format | Update pattern | Volume | |---|---|---|---|---|---| | [Source 1] | [PostgreSQL / Salesforce / S3 / Kafka] | [JDBC / REST API / SDK / Webhook] | [JSON / CSV / Parquet / CDC] | [Append / Full refresh / Incremental] | [X rows/day] | | [Source 2] | [...] | [...] | [...] | [...] | [...] |
Incremental key (if applicable): [The column used to identify new or changed records — e.g. updated_at, event_id]
Authentication: [API key / OAuth / IAM role / connection string — note where credentials are stored]
Tool: [Fivetran / Airbyte / Kafka Connect / custom script / dbt source]
Ingestion method:
Raw landing zone: [Where raw data lands before transformation — e.g. raw.salesforce_opportunities in Snowflake, S3 bucket s3://data-raw/crm/]
Schema handling: [Strict schema enforcement / Schema evolution allowed / Union schema]
List each transformation in execution order. For ELT pipelines, this is the dbt model or SQL layer.
| Step | Name | Description | Input | Output | Tool |
|---|---|---|---|---|---|
| 1 | [Deduplicate events] | [Remove duplicate event rows based on event_id] | raw.events | staging.events_deduped | [dbt / SQL / Spark] |
| 2 | [Join user profile] | [Enrich events with user attributes from CRM] | staging.events_deduped, raw.users | staging.events_enriched | [...] |
| 3 | [Aggregate to daily] | [Roll up to user×day grain] | staging.events_enriched | mart.user_daily_activity | [...] |
Business logic rules:
payment_confirmed_at, not payment_initiated_at][email protected] domain are excluded from all metrics]Slowly Changing Dimensions (SCD) — if applicable:
users.plan_tier is SCD Type 2 — keep history of plan changes with valid_from / valid_to]| Destination | System | Schema / Table | Write mode | Consumers |
|---|---|---|---|---|
| [Primary] | [Snowflake / BigQuery / Redshift / PostgreSQL] | [analytics.mart_user_activity] | [Append / Upsert / Full replace] | [Looker / Metabase / downstream pipeline] |
| [Secondary] | [...] | [...] | [...] | [...] |
Partitioning / Clustering: [e.g. Partitioned by event_date, clustered by user_id — reduces query cost for time-range scans]
Retention policy: [e.g. Raw data retained for 90 days; mart tables retained indefinitely]
| SLA | Target | Breach action | |---|---|---| | Data freshness | [Data must be ≤ X hours old by HH:MM UTC] | [Page on-call / alert Slack channel] | | Pipeline completion | [Must complete within X minutes of trigger] | [Alert and auto-retry] | | Availability | [Pipeline must run successfully X% of days per month] | [Incident review] |
Schedule: [Cron expression and human description — e.g. 0 6 * * * — daily at 06:00 UTC]
Trigger type:
Backfill strategy: [How to reprocess historical data if the pipeline fails or logic changes — e.g. parameterised date range, full drop-and-reload]
| Check | Table | Rule | Failure action |
|---|---|---|---|
| Completeness | staging.events | event_id IS NOT NULL — 100% of rows | Block load / Alert |
| Uniqueness | mart.user_daily_activity | (user_id, date) must be unique | Block load |
| Freshness | mart.user_daily_activity | max(event_date) >= CURRENT_DATE - 1 | Alert |
| Volume | staging.events | Row count within ±20% of 7-day average | Alert |
| Referential integrity | staging.events | All user_id values exist in users table | Alert |
DQ tool: [dbt tests / Great Expectations / Monte Carlo / custom SQL assertions]
Retry policy: [e.g. 3 retries with exponential back-off: 5 min, 20 min, 60 min]
Failure modes and responses:
| Failure | Detection | Response | Owner | |---|---|---|---| | Source unavailable | HTTP 5xx / connection timeout | Retry 3×, then alert and skip run | Data engineering | | Schema change in source | Column missing or type mismatch | Block load, alert schema owner | Data owner + engineering | | DQ check fails | dbt test failure / assertion error | Block load for P1 checks; alert for P2 | Data engineering | | Partial load | Row count < expected threshold | Alert; do not publish to consumers until resolved | Data engineering |
Dead-letter queue: [Where failed records are routed for manual inspection — e.g. raw.dlq_events]
Metrics to track:
Alerting:
Logging: [What gets logged and where — e.g. Airflow task logs to CloudWatch, structured JSON to data lake]
Upstream dependencies: [Which pipelines or data sources must succeed before this pipeline runs?]
Downstream dependents: [Which dashboards, pipelines, or models depend on this pipeline's output?]
[upstream pipeline A] ──► THIS PIPELINE ──► [downstream dashboard B]
└──► [downstream pipeline C]
Coordination mechanism: [Airflow DAG dependency / dbt ref() / event trigger / manual gate]
email, ip_address, name]development
Analyse competitor moves and translate them into strategic implications for your product roadmap. Use when a competitor announces a new feature, pricing change, partnership, or strategic shift, or when producing a periodic competitive intelligence report. Produces a categorised signal analysis with reactive-vs-proactive assessment, threat ratings, specific roadmap implications, and recommended responses with owners.
development
Build a community management playbook for a brand's social media channels. Use when asked to create guidelines for managing comments, DMs, and community interactions, define a moderation policy, or build response frameworks for social media community managers. Produces a complete playbook with response templates, escalation paths, moderation rules, and tone guidelines.
development
Activate a 4-stage coding discipline framework that forces Claude to plan before coding, isolate changes on a branch, write tests first, and self-review output twice before presenting it. Use when starting a complex coding task, when past Claude sessions produced broken first drafts, or when you want to prevent rework cycles. Produces a confirmed written plan, isolated feature branch, test-first implementation, and a double-reviewed output with a correctness and code-quality checklist.
development
Optimize an article for Answer Engine Optimization (AEO) — restructuring content so AI engines like ChatGPT, Perplexity, and Claude can extract, quote, and cite it. Rewrites headings as questions, drops 50-80 word answer capsules, audits paragraph length, and flags trust signals. Use when asked to AEO-optimize, make content AI-readable, improve AI citation chances, or adapt an article for answer engines.