skills/ai-coordinating-agents/SKILL.md
Build multiple AI agents that work together. Use when you need a supervisor agent that delegates to specialists, agent handoff, parallel research agents, support escalation (L1 to L2), content pipeline (writer + editor + fact-checker), or any multi-agent system. Also used for CrewAI alternative, AutoGen alternative, LangGraph multi-agent, agents that talk to each other, specialist agents with a supervisor, agents keep stepping on each other, build an AI team, route tasks to the right agent, when one agent is not enough, parallel agents for research.
npx skillsauth add lebsral/dspy-programming-not-prompting-lms-skills ai-coordinating-agentsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Guide the user through building multiple AI agents that collaborate — a supervisor delegates tasks, specialists handle their domains, and results flow back. Uses DSPy for each agent's reasoning and LangGraph for orchestration, handoff, and parallel execution.
Ask the user:
| Pattern | How it works | Good for | |---------|-------------|----------| | Supervisor | Central agent routes tasks to specialists | Support triage, research coordination | | Chain | Agent A → Agent B → Agent C in sequence | Content pipelines (write → edit → review) | | Parallel | Multiple agents work simultaneously, merge results | Research (search multiple sources at once) | | Hierarchical | Supervisor → sub-supervisors → specialists | Complex organizations with many agents |
Each agent gets its own signature, reasoning strategy, and (optionally) tools.
import dspy
lm = dspy.LM("openai/gpt-4o-mini") # or "anthropic/claude-sonnet-4-5-20250929", etc.
dspy.configure(lm=lm)
import dspy
class ResearchSummary(dspy.Signature):
"""Research the topic and provide a detailed summary with key findings."""
topic: str = dspy.InputField()
sources: list[str] = dspy.InputField(desc="Search results or documents to analyze")
summary: str = dspy.OutputField(desc="Detailed research summary")
key_findings: list[str] = dspy.OutputField(desc="Top 3-5 key findings")
class ResearchAgent(dspy.Module):
def __init__(self, retriever):
self.retriever = retriever
self.analyze = dspy.ChainOfThought(ResearchSummary)
def forward(self, topic):
sources = self.retriever(topic).passages
return self.analyze(topic=topic, sources=sources)
def search_web(query: str) -> str:
"""Search the web for current information."""
# your search implementation
return results
def query_database(sql: str) -> str:
"""Query the analytics database."""
# your database implementation
return results
class DataAnalyst(dspy.Module):
def __init__(self):
self.agent = dspy.ReAct(
"question, context -> analysis, recommendation",
tools=[search_web, query_database],
max_iters=5,
)
def forward(self, question, context=""):
return self.agent(question=question, context=context)
Convert pre-built LangChain tools for use in DSPy agents:
from langchain_community.tools import DuckDuckGoSearchRun
search_tool = dspy.Tool.from_langchain(DuckDuckGoSearchRun())
class WebResearcher(dspy.Module):
def __init__(self):
self.agent = dspy.ReAct(
"question -> findings",
tools=[search_tool],
max_iters=5,
)
def forward(self, question):
return self.agent(question=question)
The supervisor decides which agent to call next based on the current state.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class TeamState(TypedDict):
task: str # the overall task
messages: Annotated[list[dict], operator.add] # communication log
current_agent: str # who's working now
results: dict # collected results from agents
status: str # "in_progress", "done", "needs_review"
class RouteTask(dspy.Signature):
"""Decide which specialist agent should handle the next step."""
task: str = dspy.InputField(desc="The overall task")
completed_work: str = dspy.InputField(desc="Work completed so far")
available_agents: list[str] = dspy.InputField()
next_agent: str = dspy.OutputField(desc="Which agent to call next")
sub_task: str = dspy.OutputField(desc="Specific instruction for that agent")
is_complete: bool = dspy.OutputField(desc="Whether the overall task is done")
supervisor_module = dspy.ChainOfThought(RouteTask)
def supervisor(state: TeamState) -> dict:
completed = "\n".join(
f"{k}: {v}" for k, v in state["results"].items()
)
result = supervisor_module(
task=state["task"],
completed_work=completed or "Nothing yet",
available_agents=["researcher", "writer", "reviewer"],
)
if result.is_complete:
return {"status": "done", "current_agent": "none"}
return {
"current_agent": result.next_agent,
"messages": [{"role": "supervisor", "content": f"@{result.next_agent}: {result.sub_task}"}],
}
researcher = ResearchAgent(retriever=my_retriever)
writer_module = dspy.ChainOfThought(WriteContent)
reviewer_module = dspy.ChainOfThought(ReviewContent)
def researcher_node(state: TeamState) -> dict:
task_msg = state["messages"][-1]["content"]
result = researcher(topic=task_msg)
return {
"results": {**state["results"], "research": result.summary},
"messages": [{"role": "researcher", "content": result.summary}],
}
def writer_node(state: TeamState) -> dict:
result = writer_module(
task=state["task"],
research=state["results"].get("research", ""),
)
return {
"results": {**state["results"], "draft": result.output},
"messages": [{"role": "writer", "content": result.output}],
}
def reviewer_node(state: TeamState) -> dict:
result = reviewer_module(
draft=state["results"].get("draft", ""),
task=state["task"],
)
return {
"results": {**state["results"], "review": result.feedback},
"messages": [{"role": "reviewer", "content": result.feedback}],
}
graph = StateGraph(TeamState)
# Add nodes
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_node)
graph.add_node("writer", writer_node)
graph.add_node("reviewer", reviewer_node)
# Supervisor decides who goes next
graph.add_edge(START, "supervisor")
def route_to_agent(state: TeamState) -> str:
if state["status"] == "done":
return "done"
return state["current_agent"]
graph.add_conditional_edges(
"supervisor",
route_to_agent,
{
"researcher": "researcher",
"writer": "writer",
"reviewer": "reviewer",
"done": END,
},
)
# All agents report back to supervisor
graph.add_edge("researcher", "supervisor")
graph.add_edge("writer", "supervisor")
graph.add_edge("reviewer", "supervisor")
app = graph.compile()
result = app.invoke({
"task": "Write a blog post about the benefits of remote work",
"messages": [],
"current_agent": "",
"results": {},
"status": "in_progress",
})
# Supervisor routes: researcher → writer → reviewer → done
print(result["results"]["draft"])
When one agent passes work directly to another (no supervisor).
class HandoffState(TypedDict):
task: str
context: Annotated[list[str], operator.add] # accumulated context
output: str
def agent_a(state: HandoffState) -> dict:
result = module_a(task=state["task"])
return {"context": [f"Agent A found: {result.output}"]}
def agent_b(state: HandoffState) -> dict:
full_context = "\n".join(state["context"])
result = module_b(task=state["task"], context=full_context)
return {"context": [f"Agent B added: {result.output}"]}
def agent_c(state: HandoffState) -> dict:
full_context = "\n".join(state["context"])
result = module_c(task=state["task"], context=full_context)
return {"output": result.output}
graph = StateGraph(HandoffState)
graph.add_node("a", agent_a)
graph.add_node("b", agent_b)
graph.add_node("c", agent_c)
graph.add_edge(START, "a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)
Route to different specialists based on intermediate results:
def route_after_classify(state) -> str:
if state["category"] == "billing":
return "billing_specialist"
elif state["category"] == "technical":
return "tech_specialist"
return "general_agent"
graph.add_conditional_edges("classifier", route_after_classify, {
"billing_specialist": "billing",
"tech_specialist": "tech",
"general_agent": "general",
})
Fan out to multiple agents simultaneously and merge results.
from langgraph.constants import Send
class ParallelState(TypedDict):
task: str
subtasks: list[str]
results: Annotated[list[dict], operator.add]
final_output: str
def split_task(state: ParallelState) -> list:
"""Fan out subtasks to worker agents."""
return [Send("worker", {"task": state["task"], "subtask": st}) for st in state["subtasks"]]
def worker(state: dict) -> dict:
"""Each worker handles one subtask."""
worker_module = dspy.ChainOfThought("task, subtask -> result")
result = worker_module(task=state["task"], subtask=state["subtask"])
return {"results": [{"subtask": state["subtask"], "result": result.result}]}
def merge_results(state: ParallelState) -> dict:
"""Combine all worker results into a final output."""
merger = dspy.ChainOfThought("task, partial_results -> final_output")
partial = "\n".join(f"- {r['subtask']}: {r['result']}" for r in state["results"])
result = merger(task=state["task"], partial_results=partial)
return {"final_output": result.final_output}
graph = StateGraph(ParallelState)
graph.add_node("worker", worker)
graph.add_node("merge", merge_results)
graph.add_conditional_edges(START, split_task)
graph.add_edge("worker", "merge")
graph.add_edge("merge", END)
Pause before agents take critical actions.
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
# Interrupt before any agent that takes external actions
app = graph.compile(
checkpointer=checkpointer,
interrupt_before=["execute_action", "send_email", "update_database"],
)
config = {"configurable": {"thread_id": "task-001"}}
# Run until interrupt
result = app.invoke(input_state, config)
# -> Pauses before "execute_action" node
# Human reviews the proposed action in result state
print(result["proposed_action"])
# If approved, resume from checkpoint
result = app.invoke(None, config)
Optimize each agent's prompts independently first:
def researcher_metric(example, prediction, trace=None):
"""Are the research findings relevant and complete?"""
judge = dspy.Predict(JudgeResearch)
return judge(topic=example.topic, findings=prediction.summary).is_good
optimizer = dspy.MIPROv2(metric=researcher_metric, auto="light")
optimized_researcher = optimizer.compile(researcher, trainset=research_trainset)
Then optimize all agents together with a team-level metric:
def team_metric(example, prediction, trace=None):
"""Is the final output high quality?"""
judge = dspy.Predict(JudgeOutput)
return judge(
task=example.task,
expected=example.output,
actual=prediction.final_output,
).is_good
# Create a module that wraps the full team
class TeamModule(dspy.Module):
def __init__(self):
self.supervisor = supervisor_module
self.researcher = optimized_researcher
self.writer = writer_module
self.reviewer = reviewer_module
def forward(self, task):
# Run the LangGraph app
result = app.invoke({"task": task, "messages": [], "current_agent": "", "results": {}, "status": "in_progress"})
return dspy.Prediction(final_output=result["results"].get("draft", ""))
optimizer = dspy.MIPROv2(metric=team_metric, auto="medium")
optimized_team = optimizer.compile(TeamModule(), trainset=team_trainset)
Multi-agent adds orchestration complexity. Consider simpler alternatives first:
dspy.ReAct agent (see /ai-taking-actions). A single agent with 5 tools is simpler than 3 agents with 2 tools each./ai-building-pipelines).dspy.Predict call with no tools or state, you do not need agents. Use a multi-step DSPy module instead.Use multi-agent when you genuinely need dynamic routing (supervisor decides who goes next), parallel execution (fan-out to multiple specialists), or human-in-the-loop checkpoints between steps.
route_to_agent). DSPy modules should only handle the reasoning each agent does — classify, research, write, review. If forward() contains if agent == "writer" branching, move that logic to LangGraph edges.TypedDict with 15+ fields makes the graph hard to debug and wastes context. Keep the shared state minimal — task, messages, results, status — and let agents pass specifics through the results dict.max_steps counter to the state and a check in the supervisor that forces is_complete = True after N iterations (typically 5-10).dspy.Parallel when it should use LangGraph Send(). dspy.Parallel is for independent LM calls within a single module. For parallel agents with different roles, tools, and state, use LangGraph's Send() pattern — it gives you proper state management, error handling, and the ability to interrupt individual agents.Install any skill:
npx skills add lebsral/DSPy-Programming-not-prompting-LMs-skills --skill <name>
/ai-taking-actions/ai-building-pipelines/ai-building-chatbots/ai-improving-accuracy/dspy-react/ai-do if you do not have it — it routes any AI problem to the right skill and is the fastest way to work: npx skills add lebsral/DSPy-Programming-not-prompting-LMs-skills --skill ai-dotools
See what is happening during optimizer.compile() instead of waiting blind. Use when you want to watch optimization progress, see scores as they come in, know if your optimizer is working, check if optimization is stuck, understand why optimization is taking too long, get live progress during compile, monitor convergence, detect overfitting during optimization, interpret optimization results, or pick the right tool for watching optimization. Also used for optimizer progress bar, is my optimizer doing anything, optimization seems stuck, how long will optimization take, watch GEPA run, watch MIPROv2 run, live optimization dashboard, optimizer not improving, scores not going up, optimization taking forever, see what optimizer is doing, debug slow optimization, optimization visibility, optimizer metrics, track compile progress, optimization observability.
testing
Use when you want the highest-quality prompt optimization DSPy offers — jointly optimizes instructions and few-shot demos, with auto=light/medium/heavy presets. Common scenarios - you want the best possible accuracy from prompt optimization, jointly tuning instructions and few-shot demonstrations, using auto presets for different compute budgets, or when COPRO or BootstrapFewShot alone are not reaching your accuracy target. Related - ai-improving-accuracy, dspy-copro, dspy-bootstrap-few-shot. Also used for dspy.MIPROv2, best DSPy optimizer, highest quality optimization, auto=light medium heavy, joint instruction and demo optimization, most powerful prompt optimizer, MIPROv2 vs COPRO vs BootstrapFewShot, which optimizer should I use, state of the art prompt optimization, when to use MIPROv2, optimize both instructions and examples, heavy optimization for production, best optimizer for accuracy.
testing
Use LangWatch for DSPy auto-tracing and real-time optimizer progress. Use when you want to set up LangWatch, langwatch.dspy.init, auto-tracing DSPy, real-time optimization dashboard, optimizer progress tracking, app.langwatch.ai, or DSPy optimizer dashboard. Also used for langwatch setup, pip install langwatch, langwatch trace, optimizer progress, real-time optimization, watch optimizer run, LangWatch self-hosted, langwatch docker, langwatch vs langtrace, langwatch autotrack_dspy.
data-ai
Use when you want to optimize instructions without few-shot examples — a lightweight alternative to COPRO when you do not have or do not want to use demonstrations. Common scenarios - optimizing instructions when you do not have or do not want to use few-shot demonstrations, lightweight instruction search as a first step, tasks where examples in the prompt confuse the model, or when you want fast instruction optimization without the cost of COPRO. Related - ai-improving-accuracy, dspy-copro, dspy-miprov2. Also used for dspy.GEPA, instruction optimization without demos, lightweight prompt optimization, optimize instructions only, no few-shot examples needed, GEPA vs COPRO, quick instruction search, when demonstrations hurt performance, zero-shot optimization, instruction-only optimizer, simplest instruction tuner, fast prompt optimization, skip few-shot and just tune instructions, optimize Pydantic field descriptions, GEPA structured output, GEPA does not optimize field desc.