skills/data-pipeline-patterns/SKILL.md
ETL/ELT patterns, batch vs streaming, idempotency, data quality framework, and pipeline orchestration
npx skillsauth add vibeeval/vibecosystem data-pipeline-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
| Kriter | ETL | ELT | |--------|-----|-----| | Transform location | Pipeline'da | Data warehouse'da | | Data volume | Küçük-orta | Büyük | | Flexibility | Düşük | Yüksek | | Cost | Compute-heavy | Storage-heavy | | Use case | Legacy, compliance | Modern analytics |
| Kriter | Batch | Streaming | |--------|-------|-----------| | Latency | Dakika-saat | Saniye-milisaniye | | Complexity | Düşük | Yüksek | | Cost | Düşük | Yüksek | | Use case | Reporting, ETL | Real-time alerts, dashboards | | Tool | Airflow, dbt | Kafka Streams, Flink |
# Pattern 1: Upsert
INSERT INTO target (id, name, updated_at)
VALUES (%(id)s, %(name)s, %(ts)s)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
# Pattern 2: Partition overwrite
DELETE FROM target WHERE partition_date = '2026-03-14';
INSERT INTO target SELECT * FROM staging WHERE partition_date = '2026-03-14';
# Pattern 3: Checkpoint
last_checkpoint = get_checkpoint('pipeline_x')
new_data = source.query(f"WHERE updated_at > '{last_checkpoint}'")
process(new_data)
save_checkpoint('pipeline_x', max(new_data.updated_at))
import pandera as pa
schema = pa.DataFrameSchema({
"user_id": pa.Column(int, pa.Check.gt(0), nullable=False),
"email": pa.Column(str, pa.Check.str_matches(r'^.+@.+\..+$')),
"age": pa.Column(int, pa.Check.in_range(0, 150), nullable=True),
"created_at": pa.Column(pa.DateTime, pa.Check.less_than_or_equal_to(pd.Timestamp.now()))
})
validated_df = schema.validate(df) # Fail on invalid data
| Dimension | Kontrol | Tool | |-----------|---------|------| | Completeness | NULL ratio < threshold | Great Expectations | | Accuracy | Value range checks | pandera | | Freshness | Last update < SLA | Airflow sensor | | Uniqueness | Duplicate check | SQL DISTINCT | | Consistency | Cross-table referential integrity | dbt test |
# Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG('daily_etl', schedule='0 6 * * *', catchup=False) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_fn)
transform = PythonOperator(task_id='transform', python_callable=transform_fn)
load = PythonOperator(task_id='load', python_callable=load_fn)
validate = PythonOperator(task_id='validate', python_callable=validate_fn)
extract >> transform >> load >> validate
testing
Multi-layer cognitive stack for machine-verified mathematical problem solving across 14 sub-disciplines.
tools
Unified math capabilities - computation, solving, and explanation. I route to the right tool.
tools
Deterministic router for math cognitive stack - maps user intent to exact CLI commands
tools
Guide to the math cognitive stack - what tools exist and when to use each