claude-project/skills/data-engineering/etl-patterns/SKILL.md
ETL workflow patterns, data pipeline architecture, and ingestion strategies for Somali dialect classifier. Covers source integration, transformation logic, staging patterns, and load strategies. Auto-invokes when discussing data pipelines, ETL, ingestion workflows, or data processing architecture.
npx skillsauth add ilyasibrahim/claude-agents-coordination etl-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
1. Extract (Raw Layer)
data/raw/[source-name]/2. Transform (Staging/Silver Layer)
data/staging/ or data/processed/3. Load (Gold Layer)
data/final/ or data/gold/Pattern 1: Web Scraping (Wikipedia, News)
def extract_from_web(url, source_name):
"""Extract text from web sources"""
raw_data = fetch_url(url)
save_raw(raw_data, f'data/raw/{source_name}/')
return raw_data
Pattern 2: API Integration (HuggingFace, Språkbanken)
def extract_from_api(endpoint, api_key, source_name):
"""Extract from external API"""
response = requests.get(endpoint, headers={'Authorization': api_key})
save_raw(response.json(), f'data/raw/{source_name}/')
return response.json()
Pattern 3: File Upload (Manual Datasets)
def extract_from_file(file_path, source_name):
"""Extract from uploaded files"""
with open(file_path, 'r', encoding='utf-8') as f:
raw_data = f.read()
save_raw(raw_data, f'data/raw/{source_name}/')
return raw_data
def transform_text(raw_text):
"""Standard cleaning pipeline"""
# 1. Remove HTML tags
text = remove_html_tags(raw_text)
# 2. Normalize whitespace
text = ' '.join(text.split())
# 3. Remove URLs
text = remove_urls(text)
# 4. Normalize Unicode
text = text.encode('utf-8').decode('utf-8')
return text
def validate_and_filter(records):
"""Apply quality guardrails"""
validated = []
for record in records:
# Language detection
if not is_somali(record['text']):
continue
# Quality scoring
score = compute_quality_score(record['text'])
if score < 5:
continue
# Duplicate detection
if is_duplicate(record['text'], validated):
continue
validated.append(record)
return validated
def enrich_record(record):
"""Add metadata and features"""
record['word_count'] = len(record['text'].split())
record['char_count'] = len(record['text'])
record['quality_score'] = compute_quality_score(record['text'])
record['ingestion_timestamp'] = datetime.now().isoformat()
return record
def create_splits(data, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
"""Stratified split by dialect"""
from sklearn.model_selection import train_test_split
# First split: train vs. (val + test)
train, temp = train_test_split(
data,
train_size=train_ratio,
stratify=data['label'],
random_state=42
)
# Second split: val vs. test
val, test = train_test_split(
temp,
train_size=val_ratio/(val_ratio + test_ratio),
stratify=temp['label'],
random_state=42
)
return train, val, test
def export_for_training(data, output_path):
"""Export to format expected by model"""
# Option 1: JSON Lines
with open(f'{output_path}/data.jsonl', 'w') as f:
for record in data:
f.write(json.dumps(record) + '\n')
# Option 2: CSV
df = pd.DataFrame(data)
df.to_csv(f'{output_path}/data.csv', index=False)
# Option 3: Parquet (efficient for large datasets)
df.to_parquet(f'{output_path}/data.parquet')
def incremental_etl(source, last_run_timestamp):
"""Process only new data since last run"""
# 1. Extract new records
new_records = extract_since(source, last_run_timestamp)
# 2. Transform
transformed = transform_batch(new_records)
# 3. Append to existing dataset
append_to_dataset(transformed, 'data/processed/dataset.jsonl')
# 4. Update last run timestamp
update_last_run(source, datetime.now())
def robust_etl_pipeline(sources):
"""ETL with error handling and logging"""
results = {'success': [], 'failed': []}
for source in sources:
try:
# Extract
raw_data = extract(source)
log_info(f"Extracted {len(raw_data)} records from {source['name']}")
# Transform
transformed = transform(raw_data)
log_info(f"Transformed {len(transformed)} records")
# Load
load(transformed, source['name'])
log_info(f"Loaded {len(transformed)} records")
results['success'].append(source['name'])
except Exception as e:
log_error(f"Failed to process {source['name']}: {str(e)}")
results['failed'].append((source['name'], str(e)))
return results
Per-Source:
Overall Pipeline:
Example Log:
[2025-11-06 19:00:00] INFO: Starting ETL pipeline
[2025-11-06 19:00:15] INFO: Wikipedia - Extracted 5,000 records
[2025-11-06 19:00:45] INFO: Wikipedia - Transformed 4,800 records (200 filtered)
[2025-11-06 19:01:00] INFO: Wikipedia - Loaded 4,800 records
[2025-11-06 19:01:05] INFO: BBC Somali - Extracted 2,500 records
[2025-11-06 19:01:25] INFO: BBC Somali - Transformed 2,450 records (50 filtered)
[2025-11-06 19:01:35] INFO: BBC Somali - Loaded 2,450 records
[2025-11-06 19:01:40] INFO: Pipeline completed: 7,250 records loaded
data/
├── raw/ # Unprocessed source data
│ ├── wikipedia/
│ ├── bbc-somali/
│ ├── huggingface/
│ └── sprakbanken/
│
├── staging/ # Cleaned, validated data
│ └── cleaned_data.jsonl
│
├── processed/ # Deduplicated, enriched data
│ └── processed_data.jsonl
│
└── final/ # Train/val/test splits
├── train.jsonl
├── val.jsonl
└── test.jsonl
This skill auto-invokes when you mention:
Version: 1.0.0 Last Updated: 2025-11-06 Project: Somali Dialect Classifier
documentation
Voice, tone, and content guidelines for data/ML dashboards. Covers microcopy, error messages, success states, and data presentation language. Auto-invokes on copy, messaging, content, labels, error messages keywords.
development
Unified design system for data/ML dashboards. Quick reference for brand vs data color decisions, component patterns, typography, spacing. Auto-invokes on styling, CSS, design, colors, UI, visualization keywords. Tiered loading - core always, philosophy/implementation on-demand.
development
Coordination protocol for main Claude Code agent. Explicit user invocation required ("mobilize agents", "coordinate", "check registry"). Provides agent orchestration, registry management, and handoff protocols. Subagents never access this - main agent provides context in task prompts.
development
Model evaluation metrics, testing protocols, and performance assessment for Somali dialect classification. Covers accuracy, F1-score, confusion matrix analysis, per-dialect performance, and evaluation best practices for multi-class classification tasks.