skills/practices/llm-architect/SKILL.md
# LLM Architect Systems architecture skill for LLM pipelines. Turns dumb pipes into intelligent systems where every call is surgically optimized for what it needs to know, when it needs to know it, and which model should think about it. **The thesis:** Better architecture makes speed, cost, and quality improve simultaneously. The waste in most LLM systems is so enormous that eliminating it improves all three at once. **Workflow:** MAP the current architecture. MEASURE where cost, latency, and
npx skillsauth add 33prime/rtg-forge skills/practices/llm-architectInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Systems architecture skill for LLM pipelines. Turns dumb pipes into intelligent systems where every call is surgically optimized for what it needs to know, when it needs to know it, and which model should think about it.
The thesis: Better architecture makes speed, cost, and quality improve simultaneously. The waste in most LLM systems is so enormous that eliminating it improves all three at once.
Workflow: MAP the current architecture. MEASURE where cost, latency, and quality sit. REDESIGN using the patterns below. VALIDATE with evals.
Context engineering is the discipline of assembling the minimum viable context for each LLM call. It is not prompt engineering (writing good prompts) — it is engineering the context assembly itself.
The thesis from Claude Code: "Prompt caching is a prefix match. Any change anywhere in the prefix invalidates everything after it. Design your entire system around this constraint."
For every LLM call in the system, answer:
| Question | Bad answer | Good answer | |----------|-----------|-------------| | What context to assemble? | Everything available | Only what this step needs to reason about | | When to assemble it? | Eagerly, at pipeline start | Just-in-time, right before the call | | How much context? | No budget, unlimited | Token budget per step | | For whom (which model)? | Same context for all models | Compressed for Haiku, full for Opus | | In what format? | Raw JSON dump | XML for Claude structure, compressed for data |
Order content from most stable to most volatile. This is the primary architectural constraint — not an optimization, but a design requirement. Every layer that changes invalidates everything below it.
[GLOBALLY CACHED] Base system instructions ← changes never
[GLOBALLY CACHED] Tool definitions ← changes never
[CACHED PER PROJECT] Project rules / CLAUDE.md ← changes rarely
[CACHED PER SESSION] Session state / env / config ← changes per session
[SEMI-CACHED] Retrieved context ← varies by query
[NOT CACHED] Conversation history ← grows per turn
[NOT CACHED] Current user message ← new each turn
Place cache_control: {"type": "ephemeral"} at each transition boundary (max 4 breakpoints per request).
These are architectural constraints, not optimization tips. Violating any one destroys cache hit rates:
sort_keys=True in Python. Non-deterministic serialization breaks caches.tool_choice, web search, citations, or image presence between turns in the same conversation.Including dozens of full tool schemas is expensive. Removing tools mid-conversation breaks the cache. Solution: send lightweight stubs with defer_loading: true.
# Stub tools — always present, same order, stable prefix
tools = [
{"name": "search_database", "defer_loading": True},
{"name": "analyze_code", "defer_loading": True},
{"name": "tool_search", ...full_schema...}, # Discovery tool has full schema
]
# Model calls tool_search to discover full schemas on demand
# Full schemas loaded only when selected — prefix stays stable
When you need modes (plan mode, review mode, etc.), don't swap the tool set — add mode tools:
# BAD: Swap tools for plan mode → cache break
tools = read_only_tools if plan_mode else all_tools
# GOOD: Keep all tools, add mode transitions as tools themselves
tools = all_tools + [enter_plan_mode_tool, exit_plan_mode_tool]
# Agent can autonomously enter plan mode without cache break
Assign token budgets per context type. When the total exceeds the model's effective window, trim the lowest-priority category first.
CONTEXT_BUDGET = {
"system_prompt": 2000, # Fixed, never trimmed
"tools": 3000, # Fixed per session
"retrieved_docs": 4000, # Trimmed by relevance score
"conversation_history": 6000, # Trimmed by recency (sliding window)
"current_input": 2000, # Never trimmed
"buffer": 3000, # Reserved for model output
}
# Total: 20K tokens — well within any model's window
Don't load context eagerly at the start. Load it right before the step that needs it.
# BAD: Load everything upfront
context = await retrieve_all_docs(query)
result_1 = await step_1(context)
result_2 = await step_2(context) # step_2 doesn't even use docs
# GOOD: Load per-step
result_1 = await step_1(state) # No retrieval needed
docs = await retrieve_docs(result_1.refined_query) # Now we know what to retrieve
result_2 = await step_2(state, docs)
Compress context differently based on which model will receive it.
def prepare_context(docs: list[str], target_model: str) -> str:
if target_model == "haiku":
# Haiku gets compressed summaries — save tokens, it only needs key facts
return "\n".join(summarize(doc, max_tokens=200) for doc in docs[:3])
elif target_model == "sonnet":
# Sonnet gets relevant excerpts with surrounding context
return "\n---\n".join(extract_relevant(doc, query) for doc in docs[:5])
else: # opus
# Opus gets full documents — it can reason across them
return "\n---\n".join(docs)
Every LLM call you replace with a deterministic tool is 100% cheaper, 10x faster, and perfectly reliable. The discipline: identify which chain links are "LLM-worthy" and which are tool-convertible.
| Task | LLM needed? | Replace with |
|------|------------|-------------|
| Parse JSON and extract field | No | json.loads() + key access |
| Classify into N known categories (after 100+ examples) | Often no | Fine-tuned classifier or keyword rules |
| Format data as table/markdown | No | Template/f-string |
| Check if output meets regex pattern | No | re.match() |
| Date/time calculations | No | datetime library |
| Routing based on task type | Usually no | Rule engine or decision tree |
| Entity extraction from structured text | Often no | Regex + NER library |
| Translate between data formats | No | Schema mapping function |
| Summarize a document | Yes | LLM (understanding required) |
| Generate creative content | Yes | LLM (generation required) |
| Multi-step reasoning | Yes | LLM (reasoning required) |
| Disambiguate vague intent | Yes | LLM (understanding required) |
# BEFORE: 3 LLM calls, ~$0.03, ~3 seconds
async def process_request(request: str):
intent = await llm.generate(f"What is the intent of: {request}")
params = await llm.generate(f"Extract parameters from: {request}")
response = await llm.generate(f"Format this for the user: {params}")
return response
# AFTER: 1 LLM call + 2 tools, ~$0.01, ~1.5 seconds
async def process_request(request: str):
intent = intent_classifier.predict(request) # scikit-learn: 2ms
params = param_extractor.extract(request, intent) # regex+pydantic: 1ms
if intent.requires_generation:
response = await llm.generate(f"Intent={intent}, params={params}: {request}")
else:
response = templates[intent.name].format(**params) # template: 0ms
return response
Opus is 18.75x more expensive than Haiku per token. Sending a classification task to Opus is burning money.
Is this a simple, structured task (classify/extract/format/tag)?
YES → Haiku ($0.80/$4.00 per MTok)
Does this require multi-step reasoning or complex analysis?
NO → Sonnet ($3.00/$15.00 per MTok)
Does accuracy justify 5x the cost?
NO → Sonnet
YES → Opus ($15.00/$75.00 per MTok)
Should extended thinking be enabled?
Math/logic/debugging? → YES, budget_tokens: 3-10K
Standard mode producing wrong results? → YES, budget_tokens: 5-15K
Otherwise → NO
Route by task type. No extra API calls. Deterministic.
ROUTING = {
"classify": "haiku", "extract": "haiku", "format": "haiku",
"tag": "haiku", "sentiment": "haiku", "route": "haiku",
"summarize": "sonnet", "code_gen": "sonnet", "analyze": "sonnet",
"code_review": "sonnet", "conversation": "sonnet",
"architect": "opus", "debug_complex": "opus", "research": "opus",
"multi_step_reasoning": "opus", "agentic_workflow": "opus",
}
Never switch models mid-conversation — it rebuilds the entire cache. Instead, fork a subagent:
# BAD: Switch model mid-conversation → cache rebuild
response = await haiku.invoke(messages=opus_conversation) # Full-price cache miss!
# GOOD: Fork a subagent with a handoff message
handoff = f"Task: {task_description}\nContext: {relevant_context}"
response = await haiku.invoke(messages=[{"role": "user", "content": handoff}])
# Haiku builds its own small, cheap cache. Opus cache stays intact.
Start cheap, escalate on low confidence. Most requests handled by the cheapest model.
Request → Haiku → [Quality gate] → Pass? → Return
Fail? → Sonnet → [Quality gate] → Pass? → Return
Fail? → Opus
When to use cascading: Variable-complexity inputs where you can evaluate quality programmatically. Avoid when: Latency-critical (worst case = 3x latency).
Use Haiku to classify complexity, then route. Adds ~$0.001 per request.
complexity = await haiku.classify(request, categories=["simple", "moderate", "complex"])
model = {"simple": "haiku", "moderate": "sonnet", "complex": "opus"}[complexity]
| Optimization | Multiplier | |-------------|-----------| | Standard pricing | 1.0x | | Prompt cache read | 0.1x input | | Batch API | 0.5x all | | Batch + cache read | 0.05x input | | Model downgrade (Opus→Haiku) | 0.05x |
Applying model routing + caching + batch where applicable yields 20-100x cost reduction.
Making agents smarter not by upgrading the model, but by upgrading the architecture around the model.
A 50-token plan saves 5000 tokens of wandering. Every agent doing non-trivial work should plan first.
async def plan_then_act(state: AgentState) -> dict:
# Planning step: cheap, saves expensive wandering
plan = await sonnet.generate(
f"Given this task, list 3-5 concrete steps to complete it.\n"
f"Task: {state['task']}\n"
f"Available tools: {state['tool_names']}\n"
f"Constraints: Be specific. Each step = one tool call."
)
# Execute the plan, not an open-ended loop
for step in plan.steps:
result = await execute_step(step, state)
state = update_state(state, result)
return state
A Haiku self-check saves expensive Opus retries. After each agent action, verify the output meets the goal.
async def reflect(output: str, goal: str) -> tuple[bool, str]:
"""Haiku-powered reflection: did we achieve the goal?"""
verdict = await haiku.generate(
f"Goal: {goal}\nOutput: {output[:1000]}\n"
f"Did the output achieve the goal? YES or NO + one-line reason."
)
passed = verdict.strip().startswith("YES")
return passed, verdict
Cost: ~$0.001 per check. Catches errors that would otherwise require a full Opus re-run at $0.10+.
Three layers, each serving a different purpose:
| Layer | What it stores | How it's accessed | Persistence | |-------|---------------|-------------------|-------------| | Working memory | Current context window contents | Always present in prompt | Per-turn | | Episodic memory | Past execution traces, successes/failures | Retrieved by similarity when starting similar tasks | Per-session or longer | | Semantic memory | Knowledge graph, facts, project structure | Retrieved by query when reasoning requires domain knowledge | Persistent |
Working memory is the most impactful to optimize. It's the context window. Use budget allocation, just-in-time retrieval, and message trimming.
Episodic memory is the highest-leverage addition. When an agent encounters a similar task to one it's done before, retrieving the previous execution trace (especially the plan and the mistakes) dramatically improves performance.
async def recall_episode(task: str) -> str | None:
"""Search past executions for similar tasks."""
episodes = await vector_store.similarity_search(task, k=3)
if episodes and episodes[0].score > 0.85:
return f"Previous similar task:\n{episodes[0].plan}\nOutcome: {episodes[0].result}"
return None
The quality of tool definitions determines agent effectiveness more than model choice.
# BAD tool
{"name": "search", "description": "Search for things"}
# GOOD tool
{
"name": "search_codebase",
"description": "Search project files by name pattern or content regex. "
"Use this when you need to find where something is defined. "
"Use Grep for content search, Glob for filename patterns.",
"input_schema": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Glob or regex pattern"},
"search_type": {"type": "string", "enum": ["filename", "content"]},
"max_results": {"type": "integer", "default": 20, "maximum": 100}
},
"required": ["pattern", "search_type"]
}
}
Don't use the same retrieval strategy for every query. Route by query type.
Query → [Classify] → Factual lookup → Simple retrieval (top-3 chunks)
→ Comparative analysis → Multi-retrieval + cross-doc reasoning
→ Creative/open-ended → Minimal retrieval + free generation
→ Current events → Web search + retrieval
Combine semantic (embedding) and keyword (BM25) search. Neither alone is sufficient.
# Semantic search: good for meaning, bad for exact terms
semantic_results = await vector_store.similarity_search(query, k=10)
# Keyword search: good for exact terms, bad for paraphrases
keyword_results = await bm25_index.search(query, k=10)
# Reciprocal Rank Fusion: combine rankings
combined = reciprocal_rank_fusion(semantic_results, keyword_results, k=60)
top_results = combined[:5]
After retrieval, re-rank with a cross-encoder for higher precision. The retriever casts a wide net; the re-ranker selects the best catch.
# Retrieve 20 candidates (fast, approximate)
candidates = await retriever.search(query, k=20)
# Re-rank with cross-encoder (slower, precise)
scored = cross_encoder.rank(query, [c.text for c in candidates])
top_5 = sorted(scored, key=lambda x: x.score, reverse=True)[:5]
Before passing retrieved chunks to the LLM, compress them to only the relevant portions. Full chunks waste tokens on irrelevant paragraphs.
async def compress_context(query: str, chunks: list[str]) -> str:
"""Use Haiku to extract only relevant sentences from each chunk."""
compressed = await haiku.generate(
f"Query: {query}\n\n"
f"Extract ONLY the sentences relevant to this query:\n\n"
+ "\n---\n".join(chunks)
)
return compressed # Typically 40-60% smaller than raw chunks
When sub-tasks are independent, execute them in parallel. This is the single biggest latency reduction for multi-step pipelines.
# SEQUENTIAL: 4 steps × 2s each = 8 seconds
result_1 = await analyze_code(state)
result_2 = await search_docs(state)
result_3 = await generate_tests(state)
result_4 = await check_security(state)
# PARALLEL: max(2s, 2s, 2s, 2s) = 2 seconds
result_1, result_2, result_3, result_4 = await asyncio.gather(
analyze_code(state),
search_docs(state),
generate_tests(state),
check_security(state),
)
Process items in parallel (map), then combine results (reduce). For large inputs, use recursive reduce.
# Map: process each file in parallel
file_analyses = await asyncio.gather(*[
analyze_file(f) for f in files
], return_exceptions=True)
# Reduce: synthesize findings (may need chunking if too many files)
combined = "\n---\n".join(
r for r in file_analyses if not isinstance(r, Exception)
)
summary = await sonnet.generate(f"Synthesize these analyses:\n{combined}")
Modern agents (Claude, GPT-4) can request multiple tools in a single turn. Always execute them in parallel.
tool_calls = [b for b in response.content if b.type == "tool_use"]
results = await asyncio.gather(*[
execute_tool(tc.name, tc.input) for tc in tool_calls
])
Without evals, optimization is guessing. The eval flywheel is the single most impactful practice:
Production failure → Extract (input, expected) pair → Add to eval suite
→ Run suite on proposed changes → Block regressions → System improves monotonically
| Type | When to use | Cost | |------|------------|------| | Deterministic (exact match, regex, contains) | Structured outputs, classifications | Free | | Embedding similarity | Open-ended text where meaning matters more than wording | Cheap | | LLM-as-judge | Quality assessment, style, correctness of reasoning | Expensive |
Block deployments that don't meet thresholds:
QUALITY_GATES = {
"accuracy": 0.85, # 85% of eval cases must pass
"regression_rate": 0.02, # Max 2% regressions from previous version
"latency_p95_ms": 5000, # 95th percentile under 5s
"cost_per_request": 0.05, # Max $0.05 average cost
}
When redesigning a pipeline, run old and new versions side-by-side:
# Route 10% of traffic to the new architecture
if hash(request_id) % 10 == 0:
result = await new_pipeline(request)
log_experiment("treatment", result, quality_score)
else:
result = await old_pipeline(request)
log_experiment("control", result, quality_score)
| Layer | What it caches | Discount | When to use | |-------|---------------|----------|-------------| | Prompt caching (Anthropic) | Input token prefixes | 90% off input | Any repeated system prompt > 1024 tokens | | Response caching (LangChain) | Full LLM responses for identical inputs | 100% (no API call) | Deterministic tasks (classification, extraction) | | Application caching (Redis/memory) | Computed results, embeddings, retrieved docs | 100% (no API call) | Expensive computations, hot data |
# Cache hierarchy: tools → system → messages
# Changing anything earlier invalidates everything later
system = [
{"type": "text", "text": system_prompt, "cache_control": {"type": "ephemeral"}}, # BP 1
]
tools = [
# ... tool definitions ...
{**last_tool, "cache_control": {"type": "ephemeral"}}, # BP 2 on last tool
]
messages = [
*history,
{"role": "user", "content": [
{"type": "text", "text": query, "cache_control": {"type": "ephemeral"}} # BP 3
]}
]
Critical rules:
sort_keys=True)tool_choice, web search, or image presence breaks the cache"ttl": "1h"When the context window fills up, you need to summarize and continue. The naive approach (separate API call with different system prompt) pays full price for all input tokens. The correct approach: fork with the exact same prefix.
# BAD: Separate compaction call — cache miss, full price
summary = await llm.invoke(
system="You are a summarizer", # Different prefix → no cache hit!
messages=[{"role": "user", "content": f"Summarize: {conversation}"}]
)
# GOOD: Cache-safe forking — reuse parent's exact prefix
summary = await llm.invoke(
system=parent_system_prompt, # Same as main conversation
tools=parent_tools, # Same tools, same order
messages=[
*parent_conversation_messages, # Same history → cache hit (1/10 price)
{"role": "user", "content": "Summarize this conversation for continuity."}
]
)
# Only the new "Summarize..." message is uncached. Everything else hits the cache.
Anthropic built compaction directly into the Messages API. Use it for long-running conversations and agentic workflows:
response = client.beta.messages.create(
betas=["compact-2026-01-12"],
model="claude-opus-4-6",
max_tokens=4096,
system=[{"type": "text", "text": system_prompt,
"cache_control": {"type": "ephemeral"}}], # Stays cached across compactions
messages=messages,
context_management={
"edits": [{
"type": "compact_20260112",
"trigger": {"type": "input_tokens", "value": 150000},
"pause_after_compaction": True, # Re-attach files/context after summary
}]
},
)
if response.stop_reason == "compaction":
messages.append({"role": "assistant", "content": response.content})
# Re-attach critical context, then continue
response = client.beta.messages.create(...)
Key behaviors:
pause_after_compaction lets you re-attach files/context after summarycompaction block can have cache_control for subsequent cache hitscache_control stays cached across compaction eventsusage.iterations array (includes compaction iteration separately)Don't block the user for 30-60s when context fills up. Build summaries proactively:
Soft threshold (e.g., 75% full) → Trigger background summarization
Hard threshold (e.g., 95% full) → Instant swap with pre-built summary
Background summarization shares the main conversation's cache prefix → ~80% cheaper than building from scratch.
# Only cache deterministic nodes — never creative/generative ones
deterministic_llm = ChatAnthropic(model="claude-3-5-haiku-20241022", cache=True)
creative_llm = ChatAnthropic(model="claude-sonnet-4-20250514", cache=False)
For every LLM call in the system, document:
┌─────────────────────────────────────────────────────┐
│ Call ID │ Model │ Input tokens │ Output tokens │ Cost │
│ Purpose │ Could this be a tool? │ Dependencies │ │
│ Context: what's in the prompt? │ Latency │ │
└─────────────────────────────────────────────────────┘
Draw the flow: which calls depend on which? Which are parallel? Which are sequential? Where does context flow?
For each call, measure:
response.usageTag costs by pipeline, customer, and step. The most expensive step is your optimization target.
In priority order:
After every redesign:
Before: 45 LLM calls, $0.85/run, 12s latency, 87% accuracy
After: 18 LLM calls, $0.12/run, 4s latency, 89% accuracy
↑ ↑ ↑ ↑
chain-to-tool routing parallel better context
| Anti-pattern | Problem | Fix | |---|---|---| | God prompt | 10K-token system prompt with everything | Split into base + conditional injection | | Token dumping | Full documents in context when excerpts suffice | Contextual compression, just-in-time retrieval | | Linear chains | Sequential when steps are independent | Fan-out/fan-in parallel execution | | One model fits all | Opus for classification, Haiku for reasoning | Route by task complexity | | Hope-based quality | No evals, "it seems to work" | Eval suite with quality gates | | Stateful spaghetti | Full state passed to every node | Subgraph isolation, state scoping | | Retry storms | Retry failed calls without limit or backoff | Max retries + exponential backoff + fallback | | Cache-breaking mutations | Toggling tool_choice or parameters between calls | Stable prefixes, consistent configuration | | System prompt modifications | Updating system prompt mid-conversation | Send system messages in later turns instead | | Model switching mid-session | Swapping models destroys the cache | Use subagent forking with handoff messages | | Tool swapping for modes | Removing/adding tools for plan mode, review mode | Keep all tools, add mode transition tools | | Reactive compaction | Blocking the user to summarize when context fills | Background compaction with soft/hard thresholds | | Premature multi-agent | 5 agents when 1 agent + good tools would work | Start with single agent, add agents only when proven necessary | | No cost attribution | "The AI bill is $5K/month" with no breakdown | Per-step, per-pipeline, per-customer cost tracking |
development
# Parallel Execution > This skill is under development. Workflow patterns for running independent tasks in parallel to improve performance and throughput. ## Topics to Cover - Identifying independent tasks suitable for parallel execution - `asyncio.gather()` with `return_exceptions=True` - `asyncio.TaskGroup` for structured concurrency (Python 3.11+) - Semaphores for bounded concurrency - `Promise.all()` and `Promise.allSettled()` in TypeScript - Handling partial failures (some tasks succeed
development
# Module Extraction > This skill is under development. Workflow for identifying and extracting reusable modules from existing codebases. Extract when a pattern is used in 3+ places and has stabilized. ## Topics to Cover - Identifying extraction candidates (rule of three) - Defining module boundaries and public interface - Dependency analysis: what does the module need? - Interface design: protocols, abstract base classes - Step-by-step extraction process - Testing strategy: tests before, dur
development
# Forge Orchestrate — Intelligent Build Orchestration You are a build planner, not a build executor. Your job is to look at a project, figure out what's left to build, decompose the work into parallel streams, assign the right intelligence level to each stream, estimate cost, and hand the user a set of terminal commands they can run. You plan. They execute. --- ## Stream Decomposition The unit of parallelism is a **stream** — a self-contained bundle of tasks that one Claude session handles e
development
# Code Review > This skill is under development. Workflow for conducting effective code reviews that catch real issues and improve code quality. ## Topics to Cover - Review priorities: correctness > design > performance > style - What to check in every review (checklist) - How to give constructive feedback - Automated checks that should run before human review - Review scope: how big is too big? - Patterns for reviewing database migrations - Patterns for reviewing API changes - When to reque