skills/dspy-async/SKILL.md
Use when you need to run DSPy modules asynchronously — FastAPI endpoints, concurrent LM calls, non-blocking execution, or integrating DSPy into async web frameworks. Common scenarios - serving DSPy behind FastAPI or Starlette, running multiple LM calls concurrently with asyncio.gather, non-blocking batch processing, combining async with streaming, or building async agent loops. Related - ai-serving-apis, dspy-parallel, dspy-streaming, dspy-utils. Also used for aforward, acall, async DSPy, await dspy, FastAPI with DSPy async, concurrent DSPy calls, asyncio with DSPy, non-blocking DSPy, async batch processing, semaphore concurrency limit, asyncio.gather DSPy, async web framework DSPy, Starlette DSPy, aiohttp DSPy.
npx skillsauth add lebsral/dspy-programming-not-prompting-lms-skills dspy-asyncInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Guide the user through running DSPy modules with async/await for non-blocking execution in web frameworks, concurrent processing, and high-throughput applications.
Every DSPy module supports async execution via aforward() and acall(). These return awaitable coroutines instead of blocking the event loop, making DSPy compatible with async web frameworks (FastAPI, Starlette, aiohttp) and enabling concurrent LM calls with asyncio.gather().
| Use async when... | Use sync when... | |-------------------|-----------------| | Serving DSPy behind FastAPI/Starlette | Running scripts or notebooks | | Making concurrent LM calls | Processing one input at a time | | Building real-time APIs | Running optimization/evaluation | | Combining with async streaming | Simple CLI tools | | Integrating with async databases/caches | No event loop in your application |
Every DSPy module has an async variant:
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini") # or "anthropic/claude-sonnet-4-5-20250929", etc.
dspy.configure(lm=lm)
qa = dspy.ChainOfThought("question -> answer")
async def ask(question: str):
# aforward() is the async version of forward()
result = await qa.aforward(question=question)
return result.answer
# Run it
answer = asyncio.run(ask("What is DSPy?"))
print(answer)
Two async methods:
module.aforward(**kwargs) -- async version of module.forward()module.acall(**kwargs) -- async version of module(**kwargs) (same thing, convenience alias)Run multiple independent LM calls concurrently:
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini")
dspy.configure(lm=lm)
summarizer = dspy.ChainOfThought("text -> summary")
async def summarize_batch(texts: list[str]):
# Launch all summarizations concurrently
tasks = [
summarizer.aforward(text=text)
for text in texts
]
results = await asyncio.gather(*tasks)
return [r.summary for r in results]
texts = ["Article 1...", "Article 2...", "Article 3..."]
summaries = asyncio.run(summarize_batch(texts))
This is significantly faster than sequential processing because LM calls are I/O-bound -- the network round-trip dominates.
from fastapi import FastAPI
import dspy
app = FastAPI()
lm = dspy.LM("openai/gpt-4o-mini")
dspy.configure(lm=lm)
classifier = dspy.Predict("text -> label, confidence: float")
@app.post("/classify")
async def classify(text: str):
# Non-blocking -- does not hold up other requests
result = await classifier.aforward(text=text)
return {"label": result.label, "confidence": result.confidence}
Why this matters: Without async, each request blocks the FastAPI worker thread. With aforward(), the worker is free to handle other requests while waiting for the LM response.
Prevent overwhelming the LM provider with too many concurrent requests:
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini")
dspy.configure(lm=lm)
processor = dspy.ChainOfThought("input -> output")
# Limit to 10 concurrent LM calls
semaphore = asyncio.Semaphore(10)
async def process_one(input_text: str):
async with semaphore:
return await processor.aforward(input=input_text)
async def process_batch(inputs: list[str]):
tasks = [process_one(text) for text in inputs]
return await asyncio.gather(*tasks)
# Even with 1000 inputs, only 10 run concurrently
results = asyncio.run(process_batch(["input"] * 1000))
Combine async execution with streaming output:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import dspy
from dspy.streaming import streamify, StreamListener
app = FastAPI()
lm = dspy.LM("openai/gpt-4o-mini")
dspy.configure(lm=lm)
qa = dspy.ChainOfThought("question -> answer")
listener = StreamListener(signature_field_name="answer")
streaming_qa = streamify(qa, stream_listeners=[listener])
@app.get("/ask")
async def ask(question: str):
async def generate():
async for chunk in streaming_qa(question=question):
if hasattr(chunk, "answer"):
yield f"data: {chunk.answer}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
When writing custom modules, implement aforward for async:
import dspy
class AsyncPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.Predict("text -> category")
self.summarize = dspy.ChainOfThought("text, category -> summary")
async def aforward(self, text):
# Run classification (async)
classification = await self.classify.aforward(text=text)
# Run summarization with the category (async)
result = await self.summarize.aforward(
text=text,
category=classification.category,
)
return dspy.Prediction(
category=classification.category,
summary=result.summary,
)
# Usage
pipeline = AsyncPipeline()
result = asyncio.run(pipeline.aforward(text="..."))
Agents with MCP tools or async tool functions need acall():
import dspy
lm = dspy.LM("openai/gpt-4o-mini")
dspy.configure(lm=lm)
async def async_search(query: str) -> str:
"""Search the web asynchronously."""
# Your async search implementation
return "results..."
agent = dspy.ReAct("question -> answer", tools=[async_search])
async def run_agent(question: str):
# acall() handles async tools automatically
result = await agent.acall(question=question)
return result.answer
module() inside async functions instead of await module.aforward(). Calling a module synchronously inside an async function blocks the event loop. Always use aforward() or acall() in async contexts.asyncio.run() inside an existing event loop. You cannot call asyncio.run() from inside an async function -- it raises RuntimeError: This event loop is already running. Use await directly instead.asyncio.gather() with 1000 tasks hits rate limits immediately. Always add a semaphore when processing large batches.forward() but not aforward() in custom modules. If your module will be called with await, implement aforward(). DSPy does not auto-wrap forward() into an async version.await inside a sync forward().Install any skill:
npx skills add lebsral/DSPy-Programming-not-prompting-LMs-skills --skill <name>
/ai-serving-apis/dspy-parallel/dspy-streaming/dspy-mcp/dspy-utils/ai-do if you do not have it -- it routes any AI problem to the right skill and is the fastest way to work: npx skills add lebsral/DSPy-Programming-not-prompting-LMs-skills --skill ai-dotools
See what is happening during optimizer.compile() instead of waiting blind. Use when you want to watch optimization progress, see scores as they come in, know if your optimizer is working, check if optimization is stuck, understand why optimization is taking too long, get live progress during compile, monitor convergence, detect overfitting during optimization, interpret optimization results, or pick the right tool for watching optimization. Also used for optimizer progress bar, is my optimizer doing anything, optimization seems stuck, how long will optimization take, watch GEPA run, watch MIPROv2 run, live optimization dashboard, optimizer not improving, scores not going up, optimization taking forever, see what optimizer is doing, debug slow optimization, optimization visibility, optimizer metrics, track compile progress, optimization observability.
testing
Use when you want the highest-quality prompt optimization DSPy offers — jointly optimizes instructions and few-shot demos, with auto=light/medium/heavy presets. Common scenarios - you want the best possible accuracy from prompt optimization, jointly tuning instructions and few-shot demonstrations, using auto presets for different compute budgets, or when COPRO or BootstrapFewShot alone are not reaching your accuracy target. Related - ai-improving-accuracy, dspy-copro, dspy-bootstrap-few-shot. Also used for dspy.MIPROv2, best DSPy optimizer, highest quality optimization, auto=light medium heavy, joint instruction and demo optimization, most powerful prompt optimizer, MIPROv2 vs COPRO vs BootstrapFewShot, which optimizer should I use, state of the art prompt optimization, when to use MIPROv2, optimize both instructions and examples, heavy optimization for production, best optimizer for accuracy.
testing
Use LangWatch for DSPy auto-tracing and real-time optimizer progress. Use when you want to set up LangWatch, langwatch.dspy.init, auto-tracing DSPy, real-time optimization dashboard, optimizer progress tracking, app.langwatch.ai, or DSPy optimizer dashboard. Also used for langwatch setup, pip install langwatch, langwatch trace, optimizer progress, real-time optimization, watch optimizer run, LangWatch self-hosted, langwatch docker, langwatch vs langtrace, langwatch autotrack_dspy.
data-ai
Use when you want to optimize instructions without few-shot examples — a lightweight alternative to COPRO when you do not have or do not want to use demonstrations. Common scenarios - optimizing instructions when you do not have or do not want to use few-shot demonstrations, lightweight instruction search as a first step, tasks where examples in the prompt confuse the model, or when you want fast instruction optimization without the cost of COPRO. Related - ai-improving-accuracy, dspy-copro, dspy-miprov2. Also used for dspy.GEPA, instruction optimization without demos, lightweight prompt optimization, optimize instructions only, no few-shot examples needed, GEPA vs COPRO, quick instruction search, when demonstrations hurt performance, zero-shot optimization, instruction-only optimizer, simplest instruction tuner, fast prompt optimization, skip few-shot and just tune instructions, optimize Pydantic field descriptions, GEPA structured output, GEPA does not optimize field desc.