skills/stack/langgraph-patterns/SKILL.md
# LangGraph Patterns Patterns for building reliable, maintainable AI agent workflows with LangGraph. Graphs should have typed state, focused nodes, explicit routing, and proper error handling. --- ## Typed State Every graph MUST define its state as a `TypedDict`. The state is the single source of truth flowing through the graph. ```python from typing import TypedDict, Annotated from langgraph.graph import add_messages class AgentState(TypedDict): messages: Annotated[list, add_messages]
npx skillsauth add 33prime/rtg-forge skills/stack/langgraph-patternsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Patterns for building reliable, maintainable AI agent workflows with LangGraph. Graphs should have typed state, focused nodes, explicit routing, and proper error handling.
Every graph MUST define its state as a TypedDict. The state is the single source of truth flowing through the graph.
from typing import TypedDict, Annotated
from langgraph.graph import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # Chat history with reducer
context: str # Retrieved context
plan: list[str] # Action plan steps
current_step: int # Progress tracker
error: str | None # Error state
final_answer: str | None # Output
TypedDict — never raw dictsAnnotated with reducers for append-only fields (like messages)error field for error propagationEach node is a function that takes state, performs ONE operation, and returns a state update.
async def retrieve_context(state: AgentState) -> dict:
"""Retrieve relevant context from the knowledge base."""
query = state["messages"][-1].content
try:
docs = await retriever.ainvoke(query)
context = "\n\n".join(doc.page_content for doc in docs)
return {"context": context}
except RetrieverError as e:
return {"error": f"Context retrieval failed: {e}"}
async def generate_response(state: AgentState) -> dict:
"""Generate a response using the LLM with retrieved context."""
if state.get("error"):
return {} # Skip if previous node errored
prompt = RESPONSE_PROMPT.format(
context=state["context"],
question=state["messages"][-1].content,
)
response = await llm.ainvoke([
SystemMessage(content=prompt),
*state["messages"],
])
return {"messages": [response], "final_answer": response.content}
error fieldBuild graphs with explicit edges and clear flow:
from langgraph.graph import StateGraph, END
def build_rag_graph() -> StateGraph:
"""Build a RAG pipeline graph."""
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("retrieve", retrieve_context)
graph.add_node("generate", generate_response)
graph.add_node("validate", validate_response)
graph.add_node("handle_error", handle_error)
# Set entry point
graph.set_entry_point("retrieve")
# Add edges
graph.add_edge("retrieve", "check_retrieval")
graph.add_conditional_edges(
"check_retrieval",
route_after_retrieval,
{
"success": "generate",
"error": "handle_error",
},
)
graph.add_edge("generate", "validate")
graph.add_conditional_edges(
"validate",
route_after_validation,
{
"valid": END,
"invalid": "generate", # Retry
"error": "handle_error",
},
)
graph.add_edge("handle_error", END)
return graph.compile()
Use routing functions to direct flow based on state:
def route_after_retrieval(state: AgentState) -> str:
"""Route based on retrieval result."""
if state.get("error"):
return "error"
if not state.get("context"):
return "error"
return "success"
def route_after_validation(state: AgentState) -> str:
"""Route based on validation result."""
if state.get("error"):
return "error"
if state.get("validation_passed"):
return "valid"
if state.get("current_step", 0) >= MAX_RETRIES:
return "error"
return "invalid"
Integrate tools through LangGraph's tool node pattern:
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode
@tool
def search_database(query: str, limit: int = 10) -> str:
"""Search the invoice database for matching records.
Args:
query: Search query string
limit: Maximum number of results to return
"""
results = db.search(query, limit=limit)
return json.dumps([r.to_dict() for r in results])
@tool
def calculate_total(invoice_id: str) -> str:
"""Calculate the total for an invoice including tax.
Args:
invoice_id: The UUID of the invoice
"""
invoice = db.get_invoice(invoice_id)
total = sum(li.quantity * li.unit_price for li in invoice.line_items)
tax = total * Decimal("0.08")
return json.dumps({"subtotal": str(total), "tax": str(tax), "total": str(total + tax)})
# Create tool node
tools = [search_database, calculate_total]
tool_node = ToolNode(tools)
# Bind tools to LLM
llm_with_tools = llm.bind_tools(tools)
Use checkpointing for long-running graphs and human-in-the-loop patterns:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
# Development: in-memory checkpointing
memory = MemorySaver()
graph = build_graph().compile(checkpointer=memory)
# Production: persistent checkpointing
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
graph = build_graph().compile(checkpointer=checkpointer)
# Invoke with thread_id for session persistence
config = {"configurable": {"thread_id": "user-session-123"}}
result = await graph.ainvoke(initial_state, config=config)
# Resume from checkpoint
result = await graph.ainvoke(None, config=config) # Continues from last state
thread_id to scope state per conversation/userDesign explicit error paths, not try/except around the whole graph:
async def handle_error(state: AgentState) -> dict:
"""Handle errors gracefully and produce a user-friendly response."""
error_msg = state.get("error", "An unknown error occurred")
return {
"messages": [AIMessage(content=f"I encountered an issue: {error_msg}. Please try again.")],
"final_answer": None,
}
state["error"] instead of raising exceptionsLong-running agents and multi-turn conversations will exceed the context window. Plan for this from the start — retroffitting compaction is expensive.
Combine LangGraph's trim_messages with Anthropic's Compaction API for layered context management:
from langchain_core.messages import trim_messages, RemoveMessage
class AgentState(TypedDict):
messages: Annotated[list, add_messages]
compaction_count: int # Track compaction cycles for budget enforcement
cumulative_tokens: int # Estimated total tokens consumed across compactions
async def call_model(state: AgentState) -> dict:
"""LLM call with message trimming — first line of defense."""
trimmed = trim_messages(
state["messages"],
max_tokens=100_000, # Soft limit — well below context window
strategy="last",
token_counter=llm,
include_system=True,
)
response = await llm.ainvoke(trimmed)
return {"messages": [response]}
Checkpoints store the full state including messages. When compaction summarizes messages, the checkpoint captures the compacted version. Key rules:
compaction_count in state so it survives checkpointsTRIGGER = 100_000
BUDGET = 3_000_000
async def check_budget(state: AgentState) -> dict:
"""Gate node — enforce total token budget across compaction cycles."""
total = state.get("compaction_count", 0) * TRIGGER
if total >= BUDGET:
return {
"messages": [AIMessage(content="Wrapping up — token budget reached.")],
"error": "budget_exceeded",
}
return {}
Long-running agents accumulate tool call/result pairs that are no longer relevant. Remove them to keep context lean:
async def cleanup_stale_messages(state: AgentState) -> dict:
"""Remove tool messages older than the last N turns."""
messages = state["messages"]
keep_after = max(0, len(messages) - 20) # Keep last 20 messages
removals = []
for i, msg in enumerate(messages[:keep_after]):
if msg.type in ("tool", "tool_call"):
removals.append(RemoveMessage(id=msg.id))
return {"messages": removals}
compaction_count in state for budget enforcementtrim_messages before every LLM call as first defensecache_control breakpoint so it survives compactionNever hardcode prompts in node functions. Use templates with clear variables:
RETRIEVAL_PROMPT = """You are a helpful assistant answering questions about invoices.
Context from the knowledge base:
{context}
User question: {question}
Instructions:
- Answer based ONLY on the provided context
- If the context doesn't contain the answer, say so
- Cite specific parts of the context in your answer
"""
# In the node:
prompt = RETRIEVAL_PROMPT.format(
context=state["context"],
question=state["messages"][-1].content,
)
{name} format for .format() substitutiondevelopment
# Parallel Execution > This skill is under development. Workflow patterns for running independent tasks in parallel to improve performance and throughput. ## Topics to Cover - Identifying independent tasks suitable for parallel execution - `asyncio.gather()` with `return_exceptions=True` - `asyncio.TaskGroup` for structured concurrency (Python 3.11+) - Semaphores for bounded concurrency - `Promise.all()` and `Promise.allSettled()` in TypeScript - Handling partial failures (some tasks succeed
development
# Module Extraction > This skill is under development. Workflow for identifying and extracting reusable modules from existing codebases. Extract when a pattern is used in 3+ places and has stabilized. ## Topics to Cover - Identifying extraction candidates (rule of three) - Defining module boundaries and public interface - Dependency analysis: what does the module need? - Interface design: protocols, abstract base classes - Step-by-step extraction process - Testing strategy: tests before, dur
development
# Forge Orchestrate — Intelligent Build Orchestration You are a build planner, not a build executor. Your job is to look at a project, figure out what's left to build, decompose the work into parallel streams, assign the right intelligence level to each stream, estimate cost, and hand the user a set of terminal commands they can run. You plan. They execute. --- ## Stream Decomposition The unit of parallelism is a **stream** — a self-contained bundle of tasks that one Claude session handles e
development
# Code Review > This skill is under development. Workflow for conducting effective code reviews that catch real issues and improve code quality. ## Topics to Cover - Review priorities: correctness > design > performance > style - What to check in every review (checklist) - How to give constructive feedback - Automated checks that should run before human review - Review scope: how big is too big? - Patterns for reviewing database migrations - Patterns for reviewing API changes - When to reque