.build/direct/example/data-ingestion-pipeline/SKILL.md
Build data ingestion pipelines for batch and streaming data from multiple sources. Covers extraction strategies, format normalization, deduplication, validation gates, and staging patterns. Triggers on data ingestion, ETL pipeline, or data import architecture requests.
npx skillsauth add organvm-iv-taxis/a-i--skills data-ingestion-pipelineInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Extract, validate, and load data from diverse sources into target systems.
Sources → Extract → Validate → Transform → Stage → Load → Verify
│ │ │ │ │ │ │
│ │ │ │ │ │ └─ Row counts match
│ │ │ │ │ └─ Write to target
│ │ │ │ └─ Staging table/file
│ │ │ └─ Normalize, enrich, deduplicate
│ │ └─ Schema validation, business rules
│ └─ Pull from source
└─ APIs, files, databases, streams
from pathlib import Path
import json
import csv
import yaml
class FileExtractor:
PARSERS = {
".json": lambda p: json.loads(p.read_text()),
".yaml": lambda p: yaml.safe_load(p.read_text()),
".yml": lambda p: yaml.safe_load(p.read_text()),
".csv": lambda p: list(csv.DictReader(p.open())),
}
def extract(self, path: Path) -> list[dict]:
parser = self.PARSERS.get(path.suffix)
if not parser:
raise ValueError(f"Unsupported format: {path.suffix}")
data = parser(path)
return data if isinstance(data, list) else [data]
import httpx
async def extract_paginated(base_url: str, params: dict = {}) -> list[dict]:
all_records = []
page = 1
async with httpx.AsyncClient() as client:
while True:
response = await client.get(base_url, params={**params, "page": page, "per_page": 100})
response.raise_for_status()
data = response.json()
records = data.get("items", data.get("results", data))
if not records:
break
all_records.extend(records)
page += 1
return all_records
import asyncpg
async def extract_from_db(dsn: str, query: str, batch_size: int = 1000):
conn = await asyncpg.connect(dsn)
try:
async for batch in conn.cursor(query, prefetch=batch_size):
yield dict(batch)
finally:
await conn.close()
from dataclasses import dataclass
@dataclass
class ValidationResult:
valid: list[dict]
invalid: list[tuple[dict, str]] # (record, error_message)
def validate_records(records: list[dict], schema: dict) -> ValidationResult:
result = ValidationResult(valid=[], invalid=[])
required_fields = schema.get("required", [])
for record in records:
errors = []
for field in required_fields:
if field not in record or record[field] is None:
errors.append(f"Missing required field: {field}")
for field, rules in schema.get("fields", {}).items():
if field in record and record[field] is not None:
value = record[field]
if "type" in rules and not isinstance(value, rules["type"]):
errors.append(f"{field}: expected {rules['type'].__name__}")
if "max_length" in rules and len(str(value)) > rules["max_length"]:
errors.append(f"{field}: exceeds max length {rules['max_length']}")
if errors:
result.invalid.append((record, "; ".join(errors)))
else:
result.valid.append(record)
return result
def apply_business_rules(records: list[dict]) -> ValidationResult:
result = ValidationResult(valid=[], invalid=[])
for record in records:
errors = []
# Example: organ must be valid
if record.get("organ") not in {"I", "II", "III", "IV", "V", "VI", "VII", "META"}:
errors.append(f"Invalid organ: {record.get('organ')}")
# Example: status must follow promotion state machine
valid_statuses = {"LOCAL", "CANDIDATE", "PUBLIC_PROCESS", "GRADUATED", "ARCHIVED"}
if record.get("status") not in valid_statuses:
errors.append(f"Invalid status: {record.get('status')}")
if errors:
result.invalid.append((record, "; ".join(errors)))
else:
result.valid.append(record)
return result
def deduplicate(records: list[dict], key_fields: list[str]) -> list[dict]:
seen = set()
unique = []
for record in records:
key = tuple(record.get(f) for f in key_fields)
if key not in seen:
seen.add(key)
unique.append(record)
return unique
from enum import Enum
class MergeStrategy(str, Enum):
KEEP_FIRST = "keep_first"
KEEP_LATEST = "keep_latest"
MERGE_FIELDS = "merge_fields"
def merge_duplicates(records: list[dict], key_fields: list[str], strategy: MergeStrategy) -> list[dict]:
groups: dict[tuple, list[dict]] = {}
for record in records:
key = tuple(record.get(f) for f in key_fields)
groups.setdefault(key, []).append(record)
merged = []
for key, group in groups.items():
if strategy == MergeStrategy.KEEP_FIRST:
merged.append(group[0])
elif strategy == MergeStrategy.KEEP_LATEST:
merged.append(group[-1])
elif strategy == MergeStrategy.MERGE_FIELDS:
result = {}
for record in group:
for k, v in record.items():
if v is not None:
result[k] = v
merged.append(result)
return merged
from pathlib import Path
from datetime import datetime
class StagingArea:
def __init__(self, base_dir: str):
self.base = Path(base_dir)
def stage(self, batch_id: str, records: list[dict]) -> Path:
stage_dir = self.base / batch_id
stage_dir.mkdir(parents=True, exist_ok=True)
data_path = stage_dir / "data.json"
meta_path = stage_dir / "metadata.json"
data_path.write_text(json.dumps(records, indent=2, default=str))
meta_path.write_text(json.dumps({
"batch_id": batch_id,
"record_count": len(records),
"staged_at": datetime.now().isoformat(),
"status": "staged",
}))
return stage_dir
def promote(self, batch_id: str) -> list[dict]:
stage_dir = self.base / batch_id
data = json.loads((stage_dir / "data.json").read_text())
meta = json.loads((stage_dir / "metadata.json").read_text())
meta["status"] = "promoted"
meta["promoted_at"] = datetime.now().isoformat()
(stage_dir / "metadata.json").write_text(json.dumps(meta, indent=2))
return data
class IngestionPipeline:
def __init__(self, extractor, validator, transformer, loader):
self.extractor = extractor
self.validator = validator
self.transformer = transformer
self.loader = loader
async def run(self, source: str) -> dict:
# Extract
raw = await self.extractor.extract(source)
# Validate
validation = self.validator.validate(raw)
if validation.invalid:
log.warning("validation_failures", count=len(validation.invalid))
# Transform
transformed = self.transformer.transform(validation.valid)
# Deduplicate
unique = deduplicate(transformed, key_fields=["id"])
# Load
loaded = await self.loader.load(unique)
return {
"extracted": len(raw),
"valid": len(validation.valid),
"invalid": len(validation.invalid),
"loaded": loaded,
}
development
Optimize resumes and CVs for impact, ATS compatibility, and audience targeting. Supports multiple formats (chronological, functional, hybrid), accomplishment framing (STAR/XYZ), and tailoring for specific roles. Triggers on resume review, CV update, job application prep, or career document requests.
testing
Transfer context between AI agent sessions with structured handoff protocols, state serialization, and decision log preservation. Covers multi-agent coordination, context compression, and continuity patterns. Triggers on agent handoff, session transfer, or multi-agent continuity requests.
tools
Craft compelling fiction and creative nonfiction with attention to structure, voice, prose style, and revision. Supports short stories, novel chapters, essays, and hybrid forms. Triggers on creative writing, fiction writing, story craft, prose style, or literary technique requests.
devops
Transform AI conversations and chat transcripts into publishable content including blog posts, documentation, tutorials, and knowledge base entries. Covers extraction, restructuring, and editorial refinement. Triggers on conversation-to-content, transcript processing, or chat-to-doc requests.