.cursor/skills/langgraph-fundamentals/SKILL.md
INVOKE THIS SKILL when writing ANY LangGraph code. Covers StateGraph, state schemas, nodes, edges, Command, Send, invoke, streaming, and error handling.
npx skillsauth add jxtngx/dgx-lab langgraph-fundamentalsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Graphs must be compile()d before execution.
</overview>
Follow these 5 steps when building a new graph:
| Use LangGraph When | Use Alternatives When | |-------------------|----------------------| | Need fine-grained control over agent orchestration | Quick prototyping → LangChain agents | | Building complex workflows with branching/loops | Simple stateless workflows → LangChain direct | | Require human-in-the-loop, persistence | Batteries-included features → Deep Agents |
</when-to-use-langgraph>| Need | Solution | Example | |------|----------|---------| | Overwrite value | No reducer (default) | Simple fields like counters | | Append to list | Reducer (operator.add / concat) | Message history, logs | | Custom logic | Custom reducer function | Complex merging |
</state-update-strategies> <ex-state-with-reducer> <python> Define state schema with reducers for accumulating lists and summing integers. ```python from typing_extensions import TypedDict, Annotated import operatorclass State(TypedDict): name: str # Default: overwrites on update messages: Annotated[list, operator.add] # Appends to list total: Annotated[int, operator.add] # Sums integers
</python>
<typescript>
Use StateSchema with ReducedValue for accumulating arrays.
```typescript
import { StateSchema, ReducedValue, MessagesValue } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
name: z.string(), // Default: overwrites
messages: MessagesValue, // Built-in for messages
items: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
});
</typescript>
</ex-state-with-reducer>
<fix-forgot-reducer-for-list>
<python>
Without a reducer, returning a list overwrites previous values.
```python
# WRONG: List will be OVERWRITTEN
class State(TypedDict):
messages: list # No reducer!
from typing import Annotated import operator
class State(TypedDict): messages: Annotated[list, operator.add]
</python>
<typescript>
Without ReducedValue, arrays are overwritten not appended.
```typescript
// WRONG: Array will be overwritten
const State = new StateSchema({
items: z.array(z.string()), // No reducer!
});
// Node 1: { items: ["A"] }, Node 2: { items: ["B"] }
// Final: { items: ["B"] } // A is lost!
// CORRECT: Use ReducedValue
const State = new StateSchema({
items: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (current, update) => current.concat(update) }
),
});
// Final: { items: ["A", "B"] }
</typescript>
</fix-forgot-reducer-for-list>
<fix-state-must-return-dict>
<python>
Nodes must return partial updates, not mutate and return full state.
```python
# WRONG: Returning entire state object
def my_node(state: State) -> State:
state["field"] = "updated"
return state # Don't mutate and return!
def my_node(state: State) -> dict: return {"field": "updated"}
</python>
<typescript>
Return partial updates only, not the full state object.
```typescript
// WRONG: Returning entire state
const myNode = async (state: typeof State.State) => {
state.field = "updated";
return state; // Don't do this!
};
// CORRECT: Return partial updates
const myNode = async (state: typeof State.State) => {
return { field: "updated" };
};
</typescript>
</fix-state-must-return-dict>
Node functions accept these arguments:
<python>| Signature | When to Use |
|-----------|-------------|
| def node(state: State) | Simple nodes that only need state |
| def node(state: State, config: RunnableConfig) | Need thread_id, tags, or configurable values |
| def node(state: State, runtime: Runtime[Context]) | Need runtime context, store, or stream_writer |
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
def plain_node(state: State):
return {"results": "done"}
def node_with_config(state: State, config: RunnableConfig):
thread_id = config["configurable"]["thread_id"]
return {"results": f"Thread: {thread_id}"}
def node_with_runtime(state: State, runtime: Runtime[Context]):
user_id = runtime.context.user_id
return {"results": f"User: {user_id}"}
</python>
<typescript>
| Signature | When to Use |
|-----------|-------------|
| (state) => {...} | Simple nodes that only need state |
| (state, config) => {...} | Need thread_id, tags, or configurable values |
import { GraphNode, StateSchema } from "@langchain/langgraph";
const plainNode: GraphNode<typeof State> = (state) => {
return { results: "done" };
};
const nodeWithConfig: GraphNode<typeof State> = (state, config) => {
const threadId = config?.configurable?.thread_id;
return { results: `Thread: ${threadId}` };
};
</typescript>
</node-function-signatures>
| Need | Edge Type | When to Use |
|------|-----------|-------------|
| Always go to same node | add_edge() | Fixed, deterministic flow |
| Route based on state | add_conditional_edges() | Dynamic branching |
| Update state AND route | Command | Combine logic in single node |
| Fan-out to multiple nodes | Send | Parallel processing with dynamic inputs |
class State(TypedDict): input: str output: str
def process_input(state: State) -> dict: return {"output": f"Processed: {state['input']}"}
def finalize(state: State) -> dict: return {"output": state["output"].upper()}
graph = ( StateGraph(State) .add_node("process", process_input) .add_node("finalize", finalize) .add_edge(START, "process") .add_edge("process", "finalize") .add_edge("finalize", END) .compile() )
result = graph.invoke({"input": "hello"}) print(result["output"]) # "PROCESSED: HELLO"
</python>
<typescript>
Chain nodes with addEdge and compile before invoking.
```typescript
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
input: z.string(),
output: z.string().default(""),
});
const processInput = async (state: typeof State.State) => {
return { output: `Processed: ${state.input}` };
};
const finalize = async (state: typeof State.State) => {
return { output: state.output.toUpperCase() };
};
const graph = new StateGraph(State)
.addNode("process", processInput)
.addNode("finalize", finalize)
.addEdge(START, "process")
.addEdge("process", "finalize")
.addEdge("finalize", END)
.compile();
const result = await graph.invoke({ input: "hello" });
console.log(result.output); // "PROCESSED: HELLO"
</typescript>
</ex-basic-graph>
<ex-conditional-edges>
<python>
Route to different nodes based on state with conditional edges.
```python
from typing import Literal
from langgraph.graph import StateGraph, START, END
class State(TypedDict): query: str route: str result: str
def classify(state: State) -> dict: if "weather" in state["query"].lower(): return {"route": "weather"} return {"route": "general"}
def route_query(state: State) -> Literal["weather", "general"]: return state["route"]
graph = ( StateGraph(State) .add_node("classify", classify) .add_node("weather", lambda s: {"result": "Sunny, 72F"}) .add_node("general", lambda s: {"result": "General response"}) .add_edge(START, "classify") .add_conditional_edges("classify", route_query, ["weather", "general"]) .add_edge("weather", END) .add_edge("general", END) .compile() )
</python>
<typescript>
addConditionalEdges routes based on function return value.
```typescript
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
query: z.string(),
route: z.string().default(""),
result: z.string().default(""),
});
const classify = async (state: typeof State.State) => {
if (state.query.toLowerCase().includes("weather")) {
return { route: "weather" };
}
return { route: "general" };
};
const routeQuery = (state: typeof State.State) => state.route;
const graph = new StateGraph(State)
.addNode("classify", classify)
.addNode("weather", async () => ({ result: "Sunny, 72F" }))
.addNode("general", async () => ({ result: "General response" }))
.addEdge(START, "classify")
.addConditionalEdges("classify", routeQuery, ["weather", "general"])
.addEdge("weather", END)
.addEdge("general", END)
.compile();
</typescript>
</ex-conditional-edges>
Command combines state updates and routing in a single return value. Fields:
update: State updates to apply (like returning a dict from a node)goto: Node name(s) to navigate to nextresume: Value to resume after interrupt() — see human-in-the-loop skillclass State(TypedDict): count: int result: str
def node_a(state: State) -> Command[Literal["node_b", "node_c"]]: """Update state AND decide next node in one return.""" new_count = state["count"] + 1 if new_count > 5: return Command(update={"count": new_count}, goto="node_c") return Command(update={"count": new_count}, goto="node_b")
graph = ( StateGraph(State) .add_node("node_a", node_a) .add_node("node_b", lambda s: {"result": "B"}) .add_node("node_c", lambda s: {"result": "C"}) .add_edge(START, "node_a") .add_edge("node_b", END) .add_edge("node_c", END) .compile() )
</python>
<typescript>
Return Command with update and goto to combine state change with routing.
```typescript
import { StateGraph, StateSchema, START, END, Command } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
count: z.number().default(0),
result: z.string().default(""),
});
const nodeA = async (state: typeof State.State) => {
const newCount = state.count + 1;
if (newCount > 5) {
return new Command({ update: { count: newCount }, goto: "node_c" });
}
return new Command({ update: { count: newCount }, goto: "node_b" });
};
const graph = new StateGraph(State)
.addNode("node_a", nodeA, { ends: ["node_b", "node_c"] })
.addNode("node_b", async () => ({ result: "B" }))
.addNode("node_c", async () => ({ result: "C" }))
.addEdge(START, "node_a")
.addEdge("node_b", END)
.addEdge("node_c", END)
.compile();
</typescript>
</ex-command-state-and-routing>
<command-return-type-annotations>
Python: Use Command[Literal["node_a", "node_b"]] as the return type annotation to declare valid goto destinations.
TypeScript: Pass { ends: ["node_a", "node_b"] } as the third argument to addNode to declare valid goto destinations.
Warning: Command only adds dynamic edges — static edges defined with add_edge / addEdge still execute. If node_a returns Command(goto="node_c") and you also have graph.add_edge("node_a", "node_b"), both node_b and node_c will run.
Fan-out with Send: return [Send("worker", {...})] from a conditional edge to spawn parallel workers. Requires a reducer on the results field.
class OrchestratorState(TypedDict): tasks: list[str] results: Annotated[list, operator.add] summary: str
def orchestrator(state: OrchestratorState): """Fan out tasks to workers.""" return [Send("worker", {"task": task}) for task in state["tasks"]]
def worker(state: dict) -> dict: return {"results": [f"Completed: {state['task']}"]}
def synthesize(state: OrchestratorState) -> dict: return {"summary": f"Processed {len(state['results'])} tasks"}
graph = ( StateGraph(OrchestratorState) .add_node("worker", worker) .add_node("synthesize", synthesize) .add_conditional_edges(START, orchestrator, ["worker"]) .add_edge("worker", "synthesize") .add_edge("synthesize", END) .compile() )
result = graph.invoke({"tasks": ["Task A", "Task B", "Task C"]})
</python>
<typescript>
Fan out tasks to parallel workers using the Send API and aggregate results.
```typescript
import { Send, StateGraph, StateSchema, ReducedValue, START, END } from "@langchain/langgraph";
import { z } from "zod";
const State = new StateSchema({
tasks: z.array(z.string()),
results: new ReducedValue(
z.array(z.string()).default(() => []),
{ reducer: (curr, upd) => curr.concat(upd) }
),
summary: z.string().default(""),
});
const orchestrator = (state: typeof State.State) => {
return state.tasks.map((task) => new Send("worker", { task }));
};
const worker = async (state: { task: string }) => {
return { results: [`Completed: ${state.task}`] };
};
const synthesize = async (state: typeof State.State) => {
return { summary: `Processed ${state.results.length} tasks` };
};
const graph = new StateGraph(State)
.addNode("worker", worker)
.addNode("synthesize", synthesize)
.addConditionalEdges(START, orchestrator, ["worker"])
.addEdge("worker", "synthesize")
.addEdge("synthesize", END)
.compile();
</typescript>
</ex-orchestrator-worker>
<fix-send-accumulator>
<python>
Use a reducer to accumulate parallel worker results (otherwise last worker overwrites).
```python
# WRONG: No reducer - last worker overwrites
class State(TypedDict):
results: list
class State(TypedDict): results: Annotated[list, operator.add] # Accumulates
</python>
<typescript>
Use ReducedValue to accumulate parallel worker results.
```typescript
// WRONG: No reducer
const State = new StateSchema({ results: z.array(z.string()) });
// CORRECT
const State = new StateSchema({
results: new ReducedValue(z.array(z.string()).default(() => []), { reducer: (curr, upd) => curr.concat(upd) }),
});
</typescript>
</fix-send-accumulator>
Call graph.invoke(input, config) to run a graph to completion and return the final state.
| Mode | What it Streams | Use Case |
|------|----------------|----------|
| values | Full state after each step | Monitor complete state |
| updates | State deltas | Track incremental updates |
| messages | LLM tokens + metadata | Chat UIs |
| custom | User-defined data | Progress indicators |
def my_node(state): writer = get_stream_writer() writer("Processing step 1...") # Do work writer("Complete!") return {"result": "done"}
for chunk in graph.stream({"data": "test"}, stream_mode="custom"): print(chunk)
</python>
<typescript>
Emit custom progress updates from within nodes using the stream writer.
```typescript
import { getWriter } from "@langchain/langgraph";
const myNode = async (state: typeof State.State) => {
const writer = getWriter();
writer("Processing step 1...");
// Do work
writer("Complete!");
return { result: "done" };
};
for await (const chunk of graph.stream({ data: "test" }, { streamMode: "custom" })) {
console.log(chunk);
}
</typescript>
</ex-stream-custom-data>
Match the error type to the right handler:
<error-handling-table>| Error Type | Who Fixes | Strategy | Example |
|---|---|---|---|
| Transient (network, rate limits) | System | RetryPolicy(max_attempts=3) | add_node(..., retry_policy=...) |
| LLM-recoverable (tool failures) | LLM | ToolNode(tools, handle_tool_errors=True) | Error returned as ToolMessage |
| User-fixable (missing info) | Human | interrupt({"message": ...}) | Collect missing data (see HITL skill) |
| Unexpected | Developer | Let bubble up | raise |
workflow.add_node( "search_documentation", search_documentation, retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0) )
</python>
<typescript>
Use retryPolicy for transient errors.
```typescript
workflow.addNode(
"searchDocumentation",
searchDocumentation,
{
retryPolicy: { maxAttempts: 3, initialInterval: 1.0 },
},
);
</typescript>
</ex-retry-policy>
<ex-tool-node-error-handling>
<python>
Use ToolNode from langgraph.prebuilt to handle tool execution and errors. When handle_tool_errors=True, errors are returned as ToolMessages so the LLM can recover.
```python
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(tools, handle_tool_errors=True)
workflow.add_node("tools", tool_node)
</python>
<typescript>
Use ToolNode from @langchain/langgraph/prebuilt to handle tool execution and errors. When handleToolErrors is true, errors are returned as ToolMessages so the LLM can recover.
```typescript
import { ToolNode } from "@langchain/langgraph/prebuilt";
const toolNode = new ToolNode(tools, { handleToolErrors: true });
workflow.addNode("tools", toolNode);
</typescript>
</ex-tool-node-error-handling>
graph = builder.compile() graph.invoke({"input": "test"})
</python>
<typescript>
Must compile() to get executable graph.
```typescript
// WRONG
await builder.invoke({ input: "test" });
// CORRECT
const graph = builder.compile();
await graph.invoke({ input: "test" });
</typescript>
</fix-compile-before-execution>
<fix-infinite-loop-needs-exit>
<python>
Provide conditional path to END to avoid infinite loops.
```python
# WRONG: Loops forever
builder.add_edge("node_a", "node_b")
builder.add_edge("node_b", "node_a")
def should_continue(state): return END if state["count"] > 10 else "node_b" builder.add_conditional_edges("node_a", should_continue)
</python>
<typescript>
Use conditional edges with END return to break loops.
```typescript
// WRONG: Loops forever
builder.addEdge("node_a", "node_b").addEdge("node_b", "node_a");
// CORRECT
builder.addConditionalEdges("node_a", (state) => state.count > 10 ? END : "node_b");
</typescript>
</fix-infinite-loop-needs-exit>
<fix-common-mistakes>
Other common mistakes:
```python
# Router must return names of nodes that exist in the graph
builder.add_node("my_node", func) # Add node BEFORE referencing in edges
builder.add_conditional_edges("node_a", router, ["my_node"])
def node_a(state) -> Command[Literal["node_b", "node_c"]]: return Command(goto="node_b")
builder.add_edge("node_a", START) # WRONG! builder.add_edge("node_a", "entry") # Use a named entry node instead
return {"items": ["item"]} # List for list reducer, not a string
```typescript
// Always await graph.invoke() - it returns a Promise
const result = await graph.invoke({ input: "test" });
// TS Command nodes need { ends } to declare routing destinations
builder.addNode("router", routerFn, { ends: ["node_b", "node_c"] });
</fix-common-mistakes>
<boundaries>
### What You Should NOT Do
tools
INVOKE THIS SKILL when working with LangSmith tracing OR querying traces. Covers adding tracing to applications and querying/exporting trace data. Uses the langsmith CLI tool.
tools
INVOKE THIS SKILL when building evaluation pipelines for LangSmith. Covers three core components: (1) Creating Evaluators - LLM-as-Judge, custom code; (2) Defining Run Functions - how to capture outputs and trajectories from your agent; (3) Running Evaluations - locally with evaluate() or auto-run via LangSmith. Uses the langsmith CLI tool.
tools
INVOKE THIS SKILL when creating evaluation datasets, uploading datasets to LangSmith, or managing existing datasets. Covers dataset types (final_response, single_step, trajectory, RAG), CLI management commands, SDK-based creation, and example management. Uses the langsmith CLI tool.
testing
INVOKE THIS SKILL when your LangGraph needs to persist state, remember conversations, travel through history, or configure subgraph checkpointer scoping. Covers checkpointers, thread_id, time travel, Store, and subgraph persistence modes.