skills/dspy-streaming/SKILL.md
Use when you need to stream LM output tokens to a frontend in real time — progressive responses, typing indicators, server-sent events, or WebSocket feeds. Common scenarios - streaming DSPy responses to a React UI, FastAPI SSE endpoint with DSPy, showing AI typing in a chat interface, streaming multi-field outputs, streaming ReAct agent loop progress, or handling duplicate fields with predict_name. Related - ai-serving-apis, ai-building-chatbots, dspy-react, dspy-utils. Also used for dspy.streamify, dspy.streaming, StreamListener, StreamResponse, StatusMessage, stream tokens from DSPy, real-time AI output, server-sent events with DSPy, progressive response, show AI typing, streaming DSPy to frontend, async streaming generator, sync streaming fallback, multi-field streaming, allow_reuse streaming, predict_name streaming, streaming ReAct agent, FastAPI SSE DSPy.
npx skillsauth add lebsral/dspy-programming-not-prompting-lms-skills dspy-streamingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Guide the user through streaming DSPy module outputs token-by-token to frontends, APIs, or CLI interfaces using dspy.streamify() and StreamListener.
dspy.streamify() wraps any DSPy module and returns an async (or sync) generator that yields chunks as the LM produces tokens. StreamListener lets you target specific output fields. The stream yields three chunk types -- StreamResponse (partial tokens), StatusMessage (progress updates), and Prediction (final result).
| Stream when... | Do NOT stream when... | |----------------|----------------------| | Building a chat UI that shows typing | Batch processing many inputs | | SSE/WebSocket endpoint for real-time display | Running optimization or evaluation | | Long-form generation where users wait | Output is short (< 50 tokens) | | ReAct agent loop where you show each step | Pipeline stages that feed into each other |
import dspy
from dspy.streaming import streamify, StreamListener
lm = dspy.LM("openai/gpt-4o-mini") # or "anthropic/claude-sonnet-4-5-20250929", etc.
dspy.configure(lm=lm)
# Your existing module
qa = dspy.ChainOfThought("question -> answer")
# Create a listener for a specific output field
answer_listener = StreamListener(signature_field_name="answer")
# Wrap the module for streaming
streaming_qa = streamify(
qa,
stream_listeners=[answer_listener],
)
# Consume the async generator
async for chunk in streaming_qa(question="What is DSPy?"):
if hasattr(chunk, "answer"):
print(chunk.answer, end="", flush=True)
The stream yields three distinct types:
async for chunk in streaming_qa(question="..."):
if isinstance(chunk, dspy.streaming.StreamResponse):
# Partial tokens for a field -- the main content you display
print(chunk.answer, end="", flush=True)
elif isinstance(chunk, dspy.streaming.StatusMessage):
# Progress updates -- useful for UI status indicators
print(f"[Status: {chunk.message}]")
elif isinstance(chunk, dspy.Prediction):
# Final complete result -- same as non-streaming output
final_answer = chunk.answer
Key behavior:
StreamResponse chunks contain partial text for the field(s) you are listening toStatusMessage appears during tool calls, retries, or multi-step processingPrediction appears last (unless include_final_prediction_in_output_stream=False)Stream multiple output fields simultaneously:
class Summarize(dspy.Signature):
"""Summarize the document and extract key points."""
document: str = dspy.InputField()
summary: str = dspy.OutputField()
key_points: str = dspy.OutputField()
summarizer = dspy.Predict(Summarize)
# Listen to both fields
summary_listener = StreamListener(signature_field_name="summary")
points_listener = StreamListener(signature_field_name="key_points")
streaming_summarizer = streamify(
summarizer,
stream_listeners=[summary_listener, points_listener],
)
async for chunk in streaming_summarizer(document="..."):
if hasattr(chunk, "summary"):
# Tokens for the summary field
display_summary(chunk.summary)
elif hasattr(chunk, "key_points"):
# Tokens for the key_points field
display_points(chunk.key_points)
Note: Fields stream sequentially in declaration order. The LM generates summary fully before starting key_points.
For agents, use allow_reuse=True so the listener captures output across multiple reasoning iterations:
agent = dspy.ReAct("question -> answer", tools=[search, lookup])
# allow_reuse=True is critical for agents -- the agent generates
# multiple "answer" attempts across iterations
answer_listener = StreamListener(
signature_field_name="answer",
allow_reuse=True,
)
streaming_agent = streamify(
agent,
stream_listeners=[answer_listener],
status_message_provider=lambda step: f"Agent step {step}...",
)
async for chunk in streaming_agent(question="..."):
if isinstance(chunk, dspy.streaming.StatusMessage):
show_progress(chunk.message)
elif hasattr(chunk, "answer"):
show_answer_token(chunk.answer)
When a module has multiple predictors with the same output field name, use predict_name to target a specific one:
class Pipeline(dspy.Module):
def __init__(self):
self.draft = dspy.ChainOfThought("topic -> answer")
self.refine = dspy.ChainOfThought("draft_answer -> answer")
def forward(self, topic):
draft = self.draft(topic=topic)
return self.refine(draft_answer=draft.answer)
pipeline = Pipeline()
# Only stream the final refinement step, not the draft
final_listener = StreamListener(
signature_field_name="answer",
predict_name="refine", # matches self.refine attribute name
)
streaming_pipeline = streamify(pipeline, stream_listeners=[final_listener])
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import dspy
from dspy.streaming import streamify, StreamListener
app = FastAPI()
qa = dspy.ChainOfThought("question -> answer")
answer_listener = StreamListener(signature_field_name="answer")
streaming_qa = streamify(qa, stream_listeners=[answer_listener])
@app.get("/ask")
async def ask(question: str):
async def event_stream():
async for chunk in streaming_qa(question=question):
if hasattr(chunk, "answer"):
yield f"data: {chunk.answer}\n\n"
elif isinstance(chunk, dspy.Prediction):
yield f"data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
For non-async contexts (scripts, notebooks), set async_streaming=False:
streaming_qa = streamify(
qa,
stream_listeners=[answer_listener],
async_streaming=False, # Returns a sync generator
)
# Use regular for loop instead of async for
for chunk in streaming_qa(question="What is DSPy?"):
if hasattr(chunk, "answer"):
print(chunk.answer, end="", flush=True)
async_streaming=False in non-async code. If you use a regular for loop with an async generator, you get TypeError: 'async_generator' object is not iterable. Always set async_streaming=False for sync contexts.[[ ## field_name ## ]]) before it can emit tokens. First chunk arrives slightly delayed.allow_reuse=True for agents. Without it, StreamListener only captures the first iteration. ReAct and CodeAct agents need allow_reuse=True to stream across multiple reasoning cycles.include_final_prediction_in_output_stream=True is the default. The last item in the stream is always a Prediction object unless you explicitly disable it. Check for both StreamResponse and Prediction types.streamify() already returns a callable that produces a generator. Do not wrap it in another async function -- just call it directly and iterate.Install any skill:
npx skills add lebsral/DSPy-Programming-not-prompting-LMs-skills --skill <name>
/ai-serving-apis/ai-building-chatbots/dspy-react/dspy-async/dspy-utils/ai-do if you do not have it -- it routes any AI problem to the right skill and is the fastest way to work: npx skills add lebsral/DSPy-Programming-not-prompting-LMs-skills --skill ai-dotools
See what is happening during optimizer.compile() instead of waiting blind. Use when you want to watch optimization progress, see scores as they come in, know if your optimizer is working, check if optimization is stuck, understand why optimization is taking too long, get live progress during compile, monitor convergence, detect overfitting during optimization, interpret optimization results, or pick the right tool for watching optimization. Also used for optimizer progress bar, is my optimizer doing anything, optimization seems stuck, how long will optimization take, watch GEPA run, watch MIPROv2 run, live optimization dashboard, optimizer not improving, scores not going up, optimization taking forever, see what optimizer is doing, debug slow optimization, optimization visibility, optimizer metrics, track compile progress, optimization observability.
testing
Use when you want the highest-quality prompt optimization DSPy offers — jointly optimizes instructions and few-shot demos, with auto=light/medium/heavy presets. Common scenarios - you want the best possible accuracy from prompt optimization, jointly tuning instructions and few-shot demonstrations, using auto presets for different compute budgets, or when COPRO or BootstrapFewShot alone are not reaching your accuracy target. Related - ai-improving-accuracy, dspy-copro, dspy-bootstrap-few-shot. Also used for dspy.MIPROv2, best DSPy optimizer, highest quality optimization, auto=light medium heavy, joint instruction and demo optimization, most powerful prompt optimizer, MIPROv2 vs COPRO vs BootstrapFewShot, which optimizer should I use, state of the art prompt optimization, when to use MIPROv2, optimize both instructions and examples, heavy optimization for production, best optimizer for accuracy.
testing
Use LangWatch for DSPy auto-tracing and real-time optimizer progress. Use when you want to set up LangWatch, langwatch.dspy.init, auto-tracing DSPy, real-time optimization dashboard, optimizer progress tracking, app.langwatch.ai, or DSPy optimizer dashboard. Also used for langwatch setup, pip install langwatch, langwatch trace, optimizer progress, real-time optimization, watch optimizer run, LangWatch self-hosted, langwatch docker, langwatch vs langtrace, langwatch autotrack_dspy.
data-ai
Use when you want to optimize instructions without few-shot examples — a lightweight alternative to COPRO when you do not have or do not want to use demonstrations. Common scenarios - optimizing instructions when you do not have or do not want to use few-shot demonstrations, lightweight instruction search as a first step, tasks where examples in the prompt confuse the model, or when you want fast instruction optimization without the cost of COPRO. Related - ai-improving-accuracy, dspy-copro, dspy-miprov2. Also used for dspy.GEPA, instruction optimization without demos, lightweight prompt optimization, optimize instructions only, no few-shot examples needed, GEPA vs COPRO, quick instruction search, when demonstrations hurt performance, zero-shot optimization, instruction-only optimizer, simplest instruction tuner, fast prompt optimization, skip few-shot and just tune instructions, optimize Pydantic field descriptions, GEPA structured output, GEPA does not optimize field desc.