.agents/skills/add_engine/SKILL.md
Step-by-step guide to add a new data source engine to DataPipeline OS
npx skillsauth add Elmanda1/nexus_datagen Add New EngineInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
This skill documents the complete process for adding a new data source engine to DataPipeline OS, following the established generator pattern and pluggable architecture.
keyword, source, date, content, engagement_scoreCreate engines/<source>_engine.py using this template:
"""
engines/<source>_engine.py
<Source Name> scraper.
- Generator pattern: chunk → yield → clear() — RAM flat O(chunk_size)
- Dual mode: set <ENV_VAR> in .env for live, blank = simulation
"""
import os
import time
import random
from typing import Generator, List
class <Source>Engine:
SAFE_COLUMNS = {"keyword", "source", "date", "content", "engagement_score"}
def __init__(
self,
keywords: List[str],
start_date: str,
end_date: str,
language: str = "id",
chunk_size: int = 500,
):
self.keywords = keywords
self.start_date = start_date
self.end_date = end_date
self.language = language
self.chunk_size = chunk_size
self.all_data: List[dict] = []
def fetch(self) -> Generator[List[dict], None, None]:
api_key = os.getenv("<ENV_VAR>", "").strip()
if api_key:
yield from self._fetch_live(api_key)
else:
yield from self._fetch_simulation()
def _fetch_live(self, api_key: str) -> Generator[List[dict], None, None]:
buffer: List[dict] = []
for keyword in self.keywords:
# ... fetch data from API ...
for item in api_results:
buffer.append({
"keyword": keyword,
"source": "<source_name>",
"date": "YYYY-MM-DD",
"content": "text content here",
"engagement_score": 0,
})
if len(buffer) >= self.chunk_size:
self.all_data.extend(buffer)
yield buffer
buffer.clear()
if buffer:
self.all_data.extend(buffer)
yield buffer
buffer.clear()
def _fetch_simulation(self) -> Generator[List[dict], None, None]:
for keyword in self.keywords:
for _ in range(random.randint(3, 7)):
buffer: List[dict] = []
size = random.randint(int(self.chunk_size * 0.5), self.chunk_size)
for _ in range(size):
buffer.append({
"keyword": keyword,
"source": "<source_name>",
"date": f"2023-{random.randint(1,12):02d}-{random.randint(1,28):02d}",
"content": f"Simulated content about {keyword}",
"engagement_score": random.randint(0, 10000),
})
self.all_data.extend(buffer)
time.sleep(0.15)
yield buffer
buffer.clear()
app.pyAdd the engine to the ENGINE_REGISTRY list in app.py:
from engines.<source>_engine import <Source>Engine
ENGINE_REGISTRY = [
# ... existing engines ...
("<source_name>", <Source>Engine, "<eng_key>", "<4CHAR>"),
]
And add the engine state in pipeline_state["engines"]:
"<eng_key>": {"status": "idle", "rows": 0, "ram_mb": 0},
In frontend/templates/index.html:
value="<source_name>"data-engine="<eng_key>".env.example with instructionsrequirements.txt.env and test live modeyield buffer then buffer.clear()self.all_data — needed by schema mapperdevelopment
Diagnose and fix common issues in DataPipeline OS
development
How to set up and run the DataPipeline OS extraction pipeline
development
Maintainer-only workflow for handling GitHub Secret Scanning alerts on OpenClaw. Use when Codex needs to triage, redact, clean up, and resolve secret leakage found in issue comments, issue bodies, PR comments, or other GitHub content.
development
Maintainer workflow for OpenClaw releases, prereleases, changelog release notes, and publish validation. Use when Codex needs to prepare or verify stable or beta release steps, align version naming, assemble release notes, check release auth requirements, or validate publish-time commands and artifacts.