skills/incremental-processor/SKILL.md
Process large datasets incrementally using hash-based change detection, state persistence, and API pacing to avoid redundant work. Use when building "incremental processing", "change detection", "batch processing", or "resumable pipelines".
npx skillsauth add paolomoz/skills incremental-processorInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
| Category | Trigger | Complexity | Source | |----------|---------|------------|--------| | patterns | "incremental processing", "change detection", "batch processing", "resumable pipelines" | Medium | 3 projects |
Process large datasets efficiently by tracking what has been processed, detecting changes via content hashing, and skipping unchanged items. Combines a StateManager for checkpoint tracking, an AnalysisIndex for hash-based change detection, and disciplined API pacing to build pipelines that resume without losing progress or repeating work.
Tracks per-source high-watermark checkpoints so you know where to resume. Uses atomic writes (write to .tmp then rename) to prevent corruption on crash.
import json
from pathlib import Path
from datetime import datetime
class StateManager:
STATE_FILE = Path('.state') / 'processor_state.json'
def __init__(self):
self._state = {"channels": {}, "analysis": {}}
if self.STATE_FILE.exists():
try:
self._state = json.loads(self.STATE_FILE.read_text())
except json.JSONDecodeError:
self.STATE_FILE.rename(self.STATE_FILE.with_suffix('.corrupt'))
def _save(self):
self.STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = self.STATE_FILE.with_suffix('.tmp')
tmp.write_text(json.dumps(self._state, indent=2, default=str))
tmp.rename(self.STATE_FILE)
def get_last_ts(self, channel: str) -> str | None:
return self._state["channels"].get(channel, {}).get("last_ts")
def set_last_ts(self, channel: str, ts: str, count: int, threads: int):
self._state["channels"][channel] = {
"last_ts": ts, "message_count": count,
"thread_count": threads, "updated_at": str(datetime.now())
}
self._save()
Key rules:
Records a content hash per processed item. On subsequent runs, compares current hashes against recorded hashes to skip unchanged items.
import hashlib
class AnalysisIndex:
INDEX_FILE = Path('.state') / 'analysis_index.json'
def __init__(self):
self._index = {}
if self.INDEX_FILE.exists():
self._index = json.loads(self.INDEX_FILE.read_text())
def _save(self):
self.INDEX_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp = self.INDEX_FILE.with_suffix('.tmp')
tmp.write_text(json.dumps(self._index, indent=2))
tmp.rename(self.INDEX_FILE)
@staticmethod
def compute_hash(content: str) -> str:
return hashlib.sha256(content.encode('utf-8')).hexdigest()[:16]
def is_stale(self, source: str, item_id: str, current_hash: str) -> bool:
entry = self._index.get(f"{source}:{item_id}")
if entry is None: return True # New item
return entry.get("raw_hash") != current_hash # Changed
def record(self, source: str, item_id: str, raw_hash: str, result_id: str):
self._index[f"{source}:{item_id}"] = {
"raw_hash": raw_hash, "result_id": result_id,
"processed_at": str(datetime.now())
}
self._save()
Hash design:
Ties StateManager and AnalysisIndex together. For each item: compute hash, check staleness, process if needed, record result.
import time
class IncrementalProcessor:
def __init__(self, api_client, state: StateManager, index: AnalysisIndex):
self.api = api_client
self.state = state
self.index = index
self.api_delay = 2.0 # seconds between API calls
def process_source(self, source_id: str, items: list[dict]):
processed, skipped = 0, 0
for item in items:
content_hash = AnalysisIndex.compute_hash(item["content"])
if not self.index.is_stale(source_id, item["id"], content_hash):
skipped += 1
continue
try:
result = self._call_with_retry(item["content"])
self.index.record(source_id, item["id"], content_hash, result["id"])
processed += 1
time.sleep(self.api_delay)
except ThrottlingError:
break # Save state, resume next run
if items and processed > 0:
self.state.set_last_ts(source_id, items[-1]["timestamp"], processed, skipped)
return {"processed": processed, "skipped": skipped}
def _call_with_retry(self, content: str, max_retries: int = 3) -> dict:
for attempt in range(max_retries):
try:
return self.api.analyze(content)
except ThrottlingError:
if attempt == max_retries - 1: raise
time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s, 4s
Pacing rules:
2^attempt seconds. Never fixed-delay retries.All state lives in .state/ (add to .gitignore; mount as Docker volume for persistence). For multiple sources, namespace files: slack_state.json, github_index.json, etc.
def run_pipeline(api_client, sources: list[str]):
state = StateManager()
index = AnalysisIndex()
processor = IncrementalProcessor(api_client, state, index)
for source_id in sources:
last_ts = state.get_last_ts(source_id)
items = api_client.fetch_items(source_id, since=last_ts)
if not items:
continue
result = processor.process_source(source_id, items)
print(f"[{source_id}] Processed: {result['processed']}, Skipped: {result['skipped']}")
To force reprocessing after analysis logic changes, clear the index: index._index = {}; index._save(). Without clearing, partial failures resolve naturally -- recorded hashes let the next run skip completed items.
| Problem | Cause | Fix |
|---------|-------|-----|
| Everything reprocesses every run | Non-deterministic content hashing | Normalize before hashing: sort keys, strip whitespace, remove volatile fields |
| State file empty or {} | Crash during write | Use atomic writes: .tmp then os.rename |
| Rate limit despite pacing | Multiple instances running | Use a lock file in .state/ to prevent concurrent runs |
| Index grows indefinitely | Deleted items never cleaned | Periodically prune entries whose source items no longer exist |
| Hash collisions skip items | Truncated hash too short | Increase hash from 16 to 32 hex chars for datasets over 1M items |
_call_with_retry for provider switchingdevelopment
Generate artistic infographics from any topic. Runs the Sumi pipeline (analyze → structure → craft prompt → generate image) entirely within Claude Code. Use when "generate infographic", "create infographic", "sumi", "make an infographic about", or "visualize topic".
tools
Implement Server-Sent Events streaming from Cloudflare Workers to browser clients with reconnection, state persistence, and progress tracking. Use when building "SSE streaming", "real-time updates", "server push", or "event streaming".
development
Audit websites by cross-referencing query indexes, sitemaps, and navigation to identify content gaps, stale pages, missing metadata, and quality issues. Use when "auditing a website", "finding content gaps", "site quality audit", or "content inventory analysis".
data-ai
Track user session context across multi-turn interactions using browser sessionStorage and server-side KV caching with TTL. Use when implementing "session tracking", "conversation context", "multi-turn sessions", or "user journey tracking".