dev-toolkit/skills/python-pipeline/SKILL.md
Python data processing pipelines with modular architecture. Use when building content processing workflows, implementing dispatcher patterns, integrating Google Sheets/Drive APIs, or creating batch processing systems. Covers patterns from rosen-scraper, image-analyzer, and social-scraper projects.
npx skillsauth add jamditis/claude-skills-journalism python-pipelineInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Patterns for building production-quality data processing pipelines with Python.
Targeted at Python 3.11+ for asyncio.TaskGroup and exception groups; Python 3.12+ for the lighter type X = ... syntax. Pin a 3.13+ runtime if you want the JIT or experimental free-threading; the patterns here don't depend on either.
For a long time pandas was the default for any tabular work in Python. As of 2026 the default has shifted: polars is the right pick for multi-GB pipelines on a single machine, DuckDB is the right pick when SQL or larger-than-RAM scans are involved, and pandas stays useful for small data and the ML/notebook ecosystem (scikit-learn, statsmodels, plotnine all speak it natively).
| Tool | When | Why | |---|---|---| | pandas | < ~1 GB data, ML interop, single-threaded familiarity | Mature, ubiquitous, eager DataFrame model. Slowest in benchmarks but most ecosystem support. | | polars | 1 GB - tens of GB on one box, performance-critical pipelines | Multithreaded by default, lazy query engine, Arrow-native. ~5x speedup over pandas on filter / aggregate at 100M rows. | | DuckDB | SQL workflows, larger-than-RAM, parquet/CSV scanning, joins across many files | Vectorized + pipelined execution, cost-based optimizer, streaming scans. Works great as a thin wrapper over a directory of parquet files. |
All three speak Apache Arrow, so zero-copy interop between them is the pragmatic answer most of the time:
import polars as pl
import duckdb
# Polars: read a directory of CSVs, filter, group
df = (
pl.scan_csv('data/articles_*.csv')
.filter(pl.col('published_at') >= '2026-01-01')
.group_by('source')
.agg(pl.len().alias('count'), pl.col('word_count').mean())
.collect()
)
# DuckDB: same shape with SQL, no intermediate copy
con = duckdb.connect()
df = con.execute("""
SELECT source, COUNT(*) AS count, AVG(word_count) AS avg_wc
FROM 'data/articles_*.csv'
WHERE published_at >= '2026-01-01'
GROUP BY source
""").pl() # returns a Polars DataFrame; use .df() for pandas
# Hand off to pandas only at the boundary that needs it (e.g. scikit-learn)
import pandas as pd
pdf = df.to_pandas()
If your pipeline already uses pandas everywhere, don't pre-emptively rewrite. Migrate the bottleneck stages first — typically the CSV-load + filter step.
src/
├── workflow.py # Main orchestrator
├── dispatcher.py # Content-type router
├── processors/
│ ├── __init__.py
│ ├── base.py # Abstract base class
│ ├── article_processor.py
│ ├── video_processor.py
│ └── audio_processor.py
├── services/
│ ├── sheets_service.py # Google Sheets integration
│ ├── drive_service.py # Google Drive integration
│ └── ai_service.py # Gemini API wrapper
├── utils/
│ ├── logger.py
│ └── rate_limiter.py
└── config.py # Environment configuration
from typing import Protocol
from urllib.parse import urlparse
class Processor(Protocol):
def can_process(self, url: str) -> bool: ...
def process(self, url: str, metadata: dict) -> dict: ...
class Dispatcher:
def __init__(self):
self.processors: list[Processor] = [
ArticleProcessor(),
VideoProcessor(),
AudioProcessor(),
SocialProcessor(),
]
def dispatch(self, url: str, metadata: dict) -> dict:
for processor in self.processors:
if processor.can_process(url):
return processor.process(url, metadata)
raise ValueError(f"No processor found for URL: {url}")
# Pattern-based routing
class ArticleProcessor:
DOMAINS = ['nytimes.com', 'washingtonpost.com', 'medium.com']
def can_process(self, url: str) -> bool:
domain = urlparse(url).netloc.replace('www.', '')
return any(d in domain for d in self.DOMAINS)
import csv
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Iterator
@dataclass
class Record:
id: str
url: str
title: str | None = None
content: str | None = None
status: str = 'pending'
def read_input(path: Path) -> Iterator[Record]:
with open(path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
yield Record(**{k: v for k, v in row.items() if k in Record.__annotations__})
def write_output(records: list[Record], path: Path):
with open(path, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=list(Record.__annotations__.keys()))
writer.writeheader()
writer.writerows(asdict(r) for r in records)
def process_batch(input_path: Path, output_path: Path):
dispatcher = Dispatcher()
results = []
for record in read_input(input_path):
try:
processed = dispatcher.dispatch(record.url, asdict(record))
record.status = 'completed'
record.title = processed.get('title')
record.content = processed.get('content')
except Exception as e:
record.status = f'failed: {e}'
results.append(record)
write_output(results, output_path)
import gspread
from google.oauth2.service_account import Credentials
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive'
]
class SheetsService:
def __init__(self, credentials_path: str):
creds = Credentials.from_service_account_file(credentials_path, scopes=SCOPES)
self.client = gspread.authorize(creds)
def get_worksheet(self, spreadsheet_id: str, sheet_name: str):
spreadsheet = self.client.open_by_key(spreadsheet_id)
return spreadsheet.worksheet(sheet_name)
def read_all(self, worksheet) -> list[dict]:
return worksheet.get_all_records()
def append_row(self, worksheet, row: list):
worksheet.append_row(row, value_input_option='USER_ENTERED')
def batch_update(self, worksheet, updates: list[dict]):
"""Update multiple cells efficiently."""
# Format: [{'range': 'A1', 'values': [[value]]}]
worksheet.batch_update(updates, value_input_option='USER_ENTERED')
def find_row_by_id(self, worksheet, id_value: str, id_column: int = 1) -> int | None:
"""Find row number by ID value."""
try:
cell = worksheet.find(id_value, in_column=id_column)
return cell.row
except gspread.CellNotFound:
return None
import time
from functools import wraps
from ratelimit import limits, sleep_and_retry
# Simple rate limiter
@sleep_and_retry
@limits(calls=10, period=60) # 10 calls per minute
def rate_limited_api_call(url: str):
return requests.get(url)
# Custom rate limiter with backoff
class RateLimiter:
def __init__(self, calls_per_minute: int = 10):
self.delay = 60 / calls_per_minute
self.last_call = 0
def wait(self):
elapsed = time.time() - self.last_call
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
self.last_call = time.time()
# Usage
limiter = RateLimiter(calls_per_minute=10)
def fetch_with_rate_limit(url: str):
limiter.wait()
return requests.get(url)
For I/O-bound stages (HTTP fetches, API calls), asyncio.TaskGroup plus httpx.AsyncClient runs many requests in parallel without the boilerplate of asyncio.gather. TaskGroup's structured-concurrency model means an exception in one task cancels the rest and surfaces as an ExceptionGroup — easier to reason about than gather(return_exceptions=True).
import asyncio
import httpx
async def fetch_one(client: httpx.AsyncClient, url: str) -> tuple[str, str | Exception]:
try:
response = await client.get(url, timeout=30)
response.raise_for_status()
return (url, response.text)
except Exception as e:
return (url, e)
async def fetch_many(urls: list[str], concurrency: int = 10) -> dict[str, str | Exception]:
results: dict[str, str | Exception] = {}
sem = asyncio.Semaphore(concurrency)
async def _bounded(client: httpx.AsyncClient, url: str):
async with sem:
url, body = await fetch_one(client, url)
results[url] = body
async with httpx.AsyncClient(http2=True, timeout=30) as client:
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(_bounded(client, url))
return results
# Usage
urls = ['https://example.com/a', 'https://example.com/b', ...]
data = asyncio.run(fetch_many(urls, concurrency=20))
Pair with aiolimiter if you need a true requests-per-second cap (semaphore alone bounds concurrency, not rate). For exponential-backoff retries, wrap fetch_one with tenacity.AsyncRetrying.
import json
from pathlib import Path
class ProgressTracker:
def __init__(self, progress_file: Path):
self.progress_file = progress_file
self.state = self._load()
def _load(self) -> dict:
if self.progress_file.exists():
return json.loads(self.progress_file.read_text())
return {'processed_ids': [], 'last_row': 0, 'errors': []}
def save(self):
self.progress_file.write_text(json.dumps(self.state, indent=2))
def mark_processed(self, record_id: str):
self.state['processed_ids'].append(record_id)
self.save()
def is_processed(self, record_id: str) -> bool:
return record_id in self.state['processed_ids']
def log_error(self, record_id: str, error: str):
self.state['errors'].append({'id': record_id, 'error': error})
self.save()
# Usage in workflow
tracker = ProgressTracker(Path('progress.json'))
for record in records:
if tracker.is_processed(record.id):
continue # Skip already processed
try:
process(record)
tracker.mark_processed(record.id)
except Exception as e:
tracker.log_error(record.id, str(e))
The google-generativeai package was deprecated August 31, 2025 and the unified google-genai SDK replaced it. New code should target google-genai:
pip install google-genai
import os
import json
from google import genai
from google.genai import types
# Client carries config (API key, project, location). Reuse across calls.
client = genai.Client(api_key=os.environ['GEMINI_API_KEY'])
# Pick a current model. Names drift; check ai.google.dev/gemini-api/docs/models
# for the active list. gemini-2.5-flash is a reasonable cost-efficient default.
DEFAULT_MODEL = 'gemini-2.5-flash'
class AIService:
def __init__(self, model: str = DEFAULT_MODEL):
self.model = model
def categorize(self, text: str, taxonomy: dict) -> dict:
prompt = f"""Analyze this content and categorize it.
Content:
{text[:10000]}
Taxonomy:
{json.dumps(taxonomy, indent=2)}
Respond with JSON containing:
- category: one of the taxonomy categories
- tags: list of relevant tags
- summary: 2-3 sentence summary
"""
response = client.models.generate_content(
model=self.model,
contents=prompt,
config=types.GenerateContentConfig(response_mime_type='application/json'),
)
return json.loads(response.text)
def extract_entities(self, text: str) -> list[dict]:
prompt = f"""Extract named entities from this text.
Text:
{text[:10000]}
For each entity, provide:
- name: entity name
- type: Person, Organization, Location, Event, Work, or Concept
- prominence: 1-10 score based on importance in text
Respond with JSON array of entities.
"""
response = client.models.generate_content(
model=self.model,
contents=prompt,
config=types.GenerateContentConfig(response_mime_type='application/json'),
)
return json.loads(response.text)
# Batch processing with token-usage tracking (cost varies by model and time;
# look up live pricing rather than hardcoding a per-1k figure).
class BatchAIProcessor:
def __init__(self, ai_service: AIService):
self.ai = ai_service
self.input_tokens = 0
self.output_tokens = 0
def process_batch(
self, items: list[str], prompt_template: str
) -> list[dict]:
"""Render each item into prompt_template via .format(item=...).
prompt_template must instruct the model to return JSON, since this
method enforces response_mime_type='application/json'.
"""
results = []
for item in items:
response = client.models.generate_content(
model=self.ai.model,
contents=prompt_template.format(item=item),
config=types.GenerateContentConfig(
response_mime_type='application/json'
),
)
usage = response.usage_metadata
self.input_tokens += usage.prompt_token_count or 0
self.output_tokens += usage.candidates_token_count or 0
results.append(json.loads(response.text))
return results
response.usage_metadata carries the actual token counts, which is more accurate than length heuristics. Without response_mime_type='application/json', Gemini returns prose (often wrapped in markdown fences) and json.loads fails — every JSON-returning call needs both the config flag and a JSON-shaped prompt. For multimodal calls, pass content as a list (text + parts), not a single string.
from google import genai
from google.genai import types
from PIL import Image
from pathlib import Path
client = genai.Client(api_key=os.environ['GEMINI_API_KEY'])
def classify_image(image_path: Path, categories: list[str]) -> dict:
image = Image.open(image_path)
prompt = f"""Analyze this image and classify it.
Available categories: {', '.join(categories)}
Respond with JSON:
{{
"category": "category name",
"description": "brief description",
"suggested_filename": "descriptive-filename-with-dashes",
"tags": ["tag1", "tag2", "tag3"]
}}
"""
response = client.models.generate_content(
model='gemini-2.5-flash',
contents=[prompt, image],
config=types.GenerateContentConfig(response_mime_type='application/json'),
)
return json.loads(response.text)
# pathlib.Path.glob does NOT support brace expansion (`*.{jpg,png,webp}`);
# iterate the extensions explicitly.
IMAGE_EXTS = ('.jpg', '.jpeg', '.png', '.webp')
def organize_images(source_dir: Path, output_dir: Path):
categories = ['Nature', 'People', 'Architecture', 'Art', 'Technology', 'Other']
image_paths = (
p for p in source_dir.iterdir()
if p.is_file() and p.suffix.lower() in IMAGE_EXTS
)
for image_path in image_paths:
try:
result = classify_image(image_path, categories)
category_dir = output_dir / result['category']
category_dir.mkdir(parents=True, exist_ok=True)
new_name = f"{result['suggested_filename']}{image_path.suffix.lower()}"
image_path.rename(category_dir / new_name)
except Exception as e:
failures = output_dir / 'failures'
failures.mkdir(parents=True, exist_ok=True)
image_path.rename(failures / image_path.name)
from pathlib import Path
from dotenv import load_dotenv
import os
load_dotenv()
class Config:
# API Keys
GEMINI_API_KEY = os.environ['GEMINI_API_KEY']
GOOGLE_SHEET_ID = os.environ['GOOGLE_SHEET_ID']
# Paths
PROJECT_ROOT = Path(__file__).parent.parent
DATA_DIR = PROJECT_ROOT / 'data'
OUTPUT_DIR = PROJECT_ROOT / 'output'
CREDENTIALS_PATH = PROJECT_ROOT / 'google_credentials.json'
# Rate limits
API_CALLS_PER_MINUTE = 10
BATCH_SIZE = 50
@classmethod
def ensure_dirs(cls):
cls.DATA_DIR.mkdir(exist_ok=True)
cls.OUTPUT_DIR.mkdir(exist_ok=True)
import logging
from pathlib import Path
from datetime import datetime
def setup_logging(log_dir: Path, name: str = 'pipeline') -> logging.Logger:
log_dir.mkdir(exist_ok=True)
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Console handler (INFO+)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
# File handler (DEBUG+)
log_file = log_dir / f"{name}_{datetime.now():%Y%m%d_%H%M%S}.log"
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logger.addHandler(console)
logger.addHandler(file_handler)
return logger
Google Sheets cell limits:
MAX_CELL_LENGTH = 50000
def truncate_for_sheets(text: str) -> str:
if len(text) > MAX_CELL_LENGTH:
return text[:MAX_CELL_LENGTH - 20] + '... [truncated]'
return text
CSV encoding issues:
# Always specify encoding
with open(path, 'r', encoding='utf-8-sig') as f: # BOM handling
reader = csv.reader(f)
API quota management:
# Cache API responses
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_api_call(url: str) -> dict:
return api_client.fetch(url)
development
Use this skill when creating new files that represent architectural decisions — data models, infrastructure configs, auth boundaries, API contracts, CI/CD pipelines, or event systems. Flags irreversible decisions and forces a discussion about trade-offs before committing.
testing
Configure install-time cooldowns for npm/bun (minimum release age) and run a sandboxed pre-install scan when the cooldown has to be bypassed. Use when the user asks about supply-chain attacks, npm/bun security, "minimum release age", a "cooldown" for installs, hardening against Shai-Hulud-class worms, or how to safely install a package that was just published. Also use after any recent supply-chain incident in the npm ecosystem.
tools
Generate CLAUDE.md project memory files that transfer institutional knowledge, not obvious information. Use when setting up new journalism projects, onboarding collaborators, or documenting project-specific quirks. Includes templates for editorial tools, event websites, publications, research projects, content pipelines, and digital archives.
development
Use when suggesting APIs for a project, looking for free data sources, building weekend projects that need external data, or when the user needs weather, news, finance, sports, ML, or entertainment data without paid subscriptions