plugins/pm-data/skills/data-pipeline-spec/SKILL.md
Design an ETL/ELT data pipeline specification. Use when asked to design a data pipeline, spec an ETL or ELT process, document a data ingestion workflow, or plan a data integration. Produces a complete pipeline spec with sources, transforms, destinations, SLAs, error handling, and data quality rules.
npx skillsauth add mohitagw15856/pm-claude-skills data-pipeline-specInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
This skill produces a complete data pipeline specification covering sources, transformations, destinations, scheduling, SLAs, error handling, data quality checks, and monitoring requirements. Output is ready for engineering handoff or architecture review.
Ask the user for these if not provided:
Purpose: [One sentence — what decision or workflow does this pipeline enable?] Type: [ETL / ELT / Streaming / Batch] Owner: [Team or individual] Version: [1.0] Date: [Date] Status: [Draft / Under Review / Approved]
[2–3 sentences describing the pipeline end-to-end: what data moves, from where to where, at what cadence, and why.]
Architecture diagram (text):
[Source A] ──┐
[Source B] ──┤──► [Ingestion Layer] ──► [Transform Layer] ──► [Destination] ──► [Consumers]
[Source C] ──┘
| Source | System | Connection type | Data format | Update pattern | Volume | |---|---|---|---|---|---| | [Source 1] | [PostgreSQL / Salesforce / S3 / Kafka] | [JDBC / REST API / SDK / Webhook] | [JSON / CSV / Parquet / CDC] | [Append / Full refresh / Incremental] | [X rows/day] | | [Source 2] | [...] | [...] | [...] | [...] | [...] |
Incremental key (if applicable): [The column used to identify new or changed records — e.g. updated_at, event_id]
Authentication: [API key / OAuth / IAM role / connection string — note where credentials are stored]
Tool: [Fivetran / Airbyte / Kafka Connect / custom script / dbt source]
Ingestion method:
Raw landing zone: [Where raw data lands before transformation — e.g. raw.salesforce_opportunities in Snowflake, S3 bucket s3://data-raw/crm/]
Schema handling: [Strict schema enforcement / Schema evolution allowed / Union schema]
List each transformation in execution order. For ELT pipelines, this is the dbt model or SQL layer.
| Step | Name | Description | Input | Output | Tool |
|---|---|---|---|---|---|
| 1 | [Deduplicate events] | [Remove duplicate event rows based on event_id] | raw.events | staging.events_deduped | [dbt / SQL / Spark] |
| 2 | [Join user profile] | [Enrich events with user attributes from CRM] | staging.events_deduped, raw.users | staging.events_enriched | [...] |
| 3 | [Aggregate to daily] | [Roll up to user×day grain] | staging.events_enriched | mart.user_daily_activity | [...] |
Business logic rules:
payment_confirmed_at, not payment_initiated_at][email protected] domain are excluded from all metrics]Slowly Changing Dimensions (SCD) — if applicable:
users.plan_tier is SCD Type 2 — keep history of plan changes with valid_from / valid_to]| Destination | System | Schema / Table | Write mode | Consumers |
|---|---|---|---|---|
| [Primary] | [Snowflake / BigQuery / Redshift / PostgreSQL] | [analytics.mart_user_activity] | [Append / Upsert / Full replace] | [Looker / Metabase / downstream pipeline] |
| [Secondary] | [...] | [...] | [...] | [...] |
Partitioning / Clustering: [e.g. Partitioned by event_date, clustered by user_id — reduces query cost for time-range scans]
Retention policy: [e.g. Raw data retained for 90 days; mart tables retained indefinitely]
| SLA | Target | Breach action | |---|---|---| | Data freshness | [Data must be ≤ X hours old by HH:MM UTC] | [Page on-call / alert Slack channel] | | Pipeline completion | [Must complete within X minutes of trigger] | [Alert and auto-retry] | | Availability | [Pipeline must run successfully X% of days per month] | [Incident review] |
Schedule: [Cron expression and human description — e.g. 0 6 * * * — daily at 06:00 UTC]
Trigger type:
Backfill strategy: [How to reprocess historical data if the pipeline fails or logic changes — e.g. parameterised date range, full drop-and-reload]
| Check | Table | Rule | Failure action |
|---|---|---|---|
| Completeness | staging.events | event_id IS NOT NULL — 100% of rows | Block load / Alert |
| Uniqueness | mart.user_daily_activity | (user_id, date) must be unique | Block load |
| Freshness | mart.user_daily_activity | max(event_date) >= CURRENT_DATE - 1 | Alert |
| Volume | staging.events | Row count within ±20% of 7-day average | Alert |
| Referential integrity | staging.events | All user_id values exist in users table | Alert |
DQ tool: [dbt tests / Great Expectations / Monte Carlo / custom SQL assertions]
Retry policy: [e.g. 3 retries with exponential back-off: 5 min, 20 min, 60 min]
Failure modes and responses:
| Failure | Detection | Response | Owner | |---|---|---|---| | Source unavailable | HTTP 5xx / connection timeout | Retry 3×, then alert and skip run | Data engineering | | Schema change in source | Column missing or type mismatch | Block load, alert schema owner | Data owner + engineering | | DQ check fails | dbt test failure / assertion error | Block load for P1 checks; alert for P2 | Data engineering | | Partial load | Row count < expected threshold | Alert; do not publish to consumers until resolved | Data engineering |
Dead-letter queue: [Where failed records are routed for manual inspection — e.g. raw.dlq_events]
Metrics to track:
Alerting:
Logging: [What gets logged and where — e.g. Airflow task logs to CloudWatch, structured JSON to data lake]
Upstream dependencies: [Which pipelines or data sources must succeed before this pipeline runs?]
Downstream dependents: [Which dashboards, pipelines, or models depend on this pipeline's output?]
[upstream pipeline A] ──► THIS PIPELINE ──► [downstream dashboard B]
└──► [downstream pipeline C]
Coordination mechanism: [Airflow DAG dependency / dbt ref() / event trigger / manual gate]
email, ip_address, name]development
Build a framework for creating shareable, high-reach social media content. Use when asked to plan viral content, develop a shareable content strategy, create a hook writing system, or build a repeatable process for content that gets shared. Produces a platform-specific viral content framework with hook formulas, content structures, shareability triggers, and a content testing system.
development
Generate article or newsletter thumbnail candidates using the Gemini API from inside Claude Code. Claude reads article copy, proposes composition concepts, writes image generation prompts incorporating brand specs, calls Gemini to generate the images, evaluates the results via computer vision, and returns ranked candidates with rationale. Use when asked to create thumbnails, generate cover images, or produce visual candidates for an article or newsletter.
testing
Flips Claude's default from "find reasons you're right" to "find reasons you're wrong." A genuine thinking partner, not a mirror with grammar. Use before high-stakes decisions, plans, assumptions, or pitches you haven't stress-tested.
development
Scrapes a Substack Notes page and exports engagement data (likes, comments, restacks) to a formatted .xlsx file with conditional formatting and summary stats.