skills/running-dag-pipelines/SKILL.md
Build and execute modular DAG workflows for long-context processing using slice/map/reduce/recurse/compact/filter operators. Use for one-shot batch jobs, standalone map-reduce pipelines, or when the context-dag plugin is not installed. Trigger when input exceeds the model's context window, when reproducible logged pipelines are needed, or when multi-level recursive processing is required. If context-dag is installed, the plugin's bundled dag_runner.py provides the same capability with persistent artifact storage.
npx skillsauth add aufrank/agent-skills running-dag-pipelinesInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
A skill for composing directed acyclic graph (DAG) workflows that process arbitrarily long inputs and manage arbitrarily long sessions. It unifies two complementary approaches:
| Situation | Trigger |
|---|---|
| Input exceeds the model's context window | Use slice → map → reduce |
| Need relevance filtering before expensive LLM calls | Add a filter node |
| Input is much larger than context and may need multi-level processing | Use recurse operator |
| Long-running multi-turn session risks context rot | Use compact operator + artifact store |
| Need a reproducible, logged, artifact-backed processing pipeline | Define a dag_spec |
Templates live in scripts/templates/. Three are provided:
flat_map_reduce.json — slice → map → reduce (simplest RLM pattern)hierarchical_summarize.json — slice → filter → map → compact → reducerecursive_deep_dive.json — single recurse node with depth-bounded processingCopy a template and edit the config fields (model, prompts, thresholds).
cd skills/context-dag/scripts
python dag_runner.py \
--spec templates/flat_map_reduce.json \
--input /path/to/large_document.txt \
--out-dir ./my_run
my_run/
progress.jsonl # full event log
run_summary.json # execution metadata
result.txt # final output
split/ # per-node artifact directories
process/
combine/
sliceSplits a single input into N chunks.
| Config key | Type | Default | Description |
|---|---|---|---|
| strategy | "headings" | "markers" | "fixed" | "fixed" | Slicing strategy |
| chunk_size | int | 200000 | Characters per chunk |
| overlap | int | 0 | Char overlap for fixed chunks |
| max_slices | int | 20 | Maximum number of slices |
| marker_start | str | — | Regex for marker-based slicing |
| marker_end | str | — | Regex for marker end |
Inputs: 1 artifact → Outputs: N artifacts (one per slice)
mapRuns an LLM call on each input artifact in parallel (LCM's LLM-Map pattern).
| Config key | Type | Default | Description |
|---|---|---|---|
| model | str | from $meta.default_model | LLM model |
| system | str | "" | System prompt |
| prompt_template | str | "{input}" | Format string with {input} |
| temperature | float | 0.0 | |
| max_tokens | int | 4096 | |
| json_schema | dict | — | Optional structured output schema |
| concurrency | int | 8 | Max parallel LLM calls |
Inputs: N artifacts → Outputs: N artifacts (1:1)
reduceMerges N inputs into one. Two modes:
single — concatenate all inputs, one LLM calltree — pairwise reduction in log(n) rounds (LCM-style tree-reduce)auto (default) — single if ≤ tree_threshold inputs, tree otherwise| Config key | Type | Default | Description |
|---|---|---|---|
| model | str | from meta | LLM model |
| system | str | (built-in) | System prompt |
| prompt_template | str | "{input}" (single) or "{left}" / "{right}" (tree) | |
| mode | "single" | "tree" | "auto" | "auto" | |
| tree_threshold | int | 6 | Inputs above this trigger tree mode |
| concurrency | int | 4 | Parallel pairs in tree mode |
Inputs: N artifacts → Outputs: 1 artifact
recurseDepth-bounded recursive processing (RLM's core pattern). At each level: terminal inputs get a leaf LLM call; non-terminal inputs are sliced, mapped recursively, and reduced.
| Config key | Type | Default | Description |
|---|---|---|---|
| model | str | from meta | LLM model |
| system | str | "" | System prompt for leaf calls |
| prompt_template | str | "{input}" | |
| max_depth | int | 3 | Maximum recursion depth |
| size_threshold | int | 50000 | Chars below which input is terminal |
| chunk_size | int | 100000 | Chars per sub-slice |
| max_slices | int | 10 | |
| reduce_system | str | (built-in) | System prompt for intermediate reductions |
| concurrency | int | 4 | |
Inputs: N artifacts → Outputs: N artifacts (each recursively processed)
compactLCM-style context compaction with 3-level escalation:
Writes compaction_pointers.json mapping summaries → originals for lossless
retrieval.
| Config key | Type | Default | Description |
|---|---|---|---|
| model | str | from meta | |
| system | str | (built-in) | |
| level | 1 | 2 | 3 | "auto" | "auto" | |
| soft_threshold | int | 100000 | Chars to trigger level 1 |
| hard_threshold | int | 200000 | Chars to trigger level 2 |
| fallback_chars | int | 2000 | Chars kept in level-3 fallback |
| max_summary_tokens | int | 1024 | |
Inputs: N artifacts → Outputs: N artifacts (compacted where needed)
filterRelevance gating — scores each input against a query and drops those below threshold.
| Config key | Type | Default | Description |
|---|---|---|---|
| model | str | "gpt-4o-mini" | Scoring model |
| query | str | required | Relevance question |
| threshold | float | 0.3 | Score cutoff (0.0–1.0) |
| concurrency | int | 8 | |
Inputs: N artifacts → Outputs: ≤ N artifacts (relevant ones only)
A JSON (or YAML) file with:
{
"meta": {
"name": "workflow-name",
"description": "...",
"default_model": "gpt-4o"
},
"nodes": [
{
"id": "unique_node_id",
"operator": "slice|map|reduce|recurse|compact|filter",
"config": { ... },
"inputs": ["$input", "other_node_id", ...]
}
]
}
$input is the DAG's external input (the file passed via --input)inputs reference other node IDs — the runner topologically sorts and
routes artifacts automatically"$meta.default_model" to inherit from metaFor multi-session persistence, the ArtifactStore class in
scripts/artifact_store.py provides:
index.jsonlput_summary() records which originals a summary
compressesexpand(ref) retrieves the original texts behind any
summary node (one DAG layer at a time, matching LCM's lcm_expand pattern)from artifact_store import ArtifactStore
store = ArtifactStore(Path("./my_store"))
ref = store.put("full text...", tag="msg_42")
summary_ref = store.put("brief summary", tag="summary_42")
store.put_summary(summary_ref, original_refs=[ref])
# Later: retrieve originals
originals = store.expand(summary_ref) # → ["full text..."]
scripts/llm_client.py provides a llm_call() function with two backends:
llm librarypip install llm # core
llm install llm-anthropic # Claude
llm install llm-gemini # Gemini
# … any of the 70+ llm plugins
When the llm package is installed, the client uses it for all calls. This
gives you:
llm)schema= with Pydantic or dictresponse.usage()llm keys set (no env vars needed)llm prompt ...The llm_call_usage() variant returns both the response text and a usage dict
with input/output token counts, which is useful for compaction threshold
decisions.
If llm is not installed, the client falls back to direct OpenAI / Google
Gemini SDK calls. In this mode:
gemini → Google GenAI SDKOPENAI_API_KEY, GEMINI_API_KEY (or GOOGLE_API_KEY)llm_client exposes two module-level hooks for bolting LCM-style
interception onto any tool:
import llm_client
def my_before_hook(model, prompt, system, **kw):
print(f"About to call {model} with {len(prompt)} chars")
def my_after_hook(model, response_text, usage, **kw):
# e.g. store in artifact store, check token budget, trigger compaction
print(f"Got {len(response_text)} chars, used {usage.get('input', '?')} input tokens")
llm_client.before_call_hook = my_before_hook
llm_client.after_call_hook = my_after_hook
This is how you'd bolt LCM patterns onto standalone tools without privileged
access: intercept at the llm_call layer, persist to the artifact store, and
trigger compaction when token budgets are exceeded.
| Component | Inspired by |
|---|---|
| slice, map, reduce, recurse | RLM (Zhang, Kraska, Khattab 2025) — recursive language model inference |
| compact, artifact store, expand | LCM (Ehrlich / Voltropy 2025) — lossless context management |
| filter | Practical addition for relevance gating |
| DAG engine | Composition layer enabling flexible routing between all patterns |
| map concurrency model | LCM's LLM-Map operator-level recursion |
| Tree-reduce | Discussed in LCM HN thread as planned llm_reduce |
documentation
Write in Austin Frank's voice and style. Use this skill whenever generating text that should sound like Austin — strategy docs, charters, proposals, business cases, vision documents, staffing requests, stakeholder updates, cover letters, mission statements, org design documents, or any professional prose where the user wants Austin's distinctive voice. Also use when the user asks to review, edit, or improve a draft for voice consistency, or when they reference "my style", "my voice", "write like me", or "Austin's style".
tools
Use mcpc to interact with the Notion MCP server: connect sessions, search workspace content, fetch pages/databases, and run helper scripts for common Notion actions.
tools
Decide between a scripted workflow and an autonomous agent harness, then scaffold the chosen path. Use when scoping new agentic systems or tool integrations.
tools
Use mcpc CLI to interact with MCP servers - call tools, read resources, get prompts. Use when working with Model Context Protocol servers, calling MCP tools, or accessing MCP resources programmatically; prefer key:=value bindings over raw JSON bodies.