.claude/skills/streaming/SKILL.md
Server-Sent Events (SSE) streaming for Claude API with support for text, tool use, and extended thinking. Activate for real-time responses, stream handling, and progressive output.
npx skillsauth add markus41/claude streamingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Implement real-time streaming responses from Claude API using Server-Sent Events (SSE).
message_start
→ content_block_start
→ content_block_delta (repeated)
→ content_block_stop
→ (more blocks...)
→ message_delta
→ message_stop
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short story."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="")
elif event.delta.type == "input_json_delta":
# Tool input (accumulate, don't parse yet!)
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
# Now safe to parse tool input
if tool_input_buffer:
tool_input = json.loads(tool_input_buffer)
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
const stream = client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Write a story.' }]
});
for await (const event of stream) {
if (event.type === 'content_block_delta' &&
event.delta.type === 'text_delta') {
process.stdout.write(event.delta.text);
}
}
const finalMessage = await stream.finalMessage();
| Event | When | Data |
|-------|------|------|
| message_start | Beginning | Message metadata |
| content_block_start | Block begins | Block type, index |
| content_block_delta | Content chunk | Delta content |
| content_block_stop | Block ends | - |
| message_delta | Message update | Stop reason, usage |
| message_stop | Complete | - |
| Delta Type | Content | When |
|-----------|---------|------|
| text_delta | .text | Text content |
| input_json_delta | .partial_json | Tool input |
| thinking_delta | .thinking | Extended thinking |
| signature_delta | .signature | Thinking signature |
# WRONG - Will fail on partial JSON!
for event in stream:
if event.delta.type == "input_json_delta":
tool_input = json.loads(event.delta.partial_json) # FAILS!
# CORRECT - Accumulate then parse
tool_json_buffer = ""
for event in stream:
if event.delta.type == "input_json_delta":
tool_json_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if tool_json_buffer:
tool_input = json.loads(tool_json_buffer) # Safe now!
tool_json_buffer = ""
def stream_with_tools(client, messages, tools):
current_block = None
tool_input_buffer = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages,
tools=tools
) as stream:
for event in stream:
if event.type == "content_block_start":
current_block = event.content_block
tool_input_buffer = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield {"type": "text", "content": event.delta.text}
elif event.delta.type == "input_json_delta":
tool_input_buffer += event.delta.partial_json
elif event.type == "content_block_stop":
if current_block.type == "tool_use":
yield {
"type": "tool_call",
"id": current_block.id,
"name": current_block.name,
"input": json.loads(tool_input_buffer)
}
thinking_content = ""
signature = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={"type": "enabled", "budget_tokens": 10000},
messages=[{"role": "user", "content": "Solve this complex problem..."}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
thinking_content += event.delta.thinking
# Optionally display thinking in UI
elif event.delta.type == "signature_delta":
signature = event.delta.signature
elif event.delta.type == "text_delta":
print(event.delta.text, end="")
import time
RETRIABLE_ERRORS = [529, 429, 500, 502, 503]
def stream_with_retry(client, **kwargs):
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
with client.messages.stream(**kwargs) as stream:
for event in stream:
yield event
return
except anthropic.APIStatusError as e:
if e.status_code in RETRIABLE_ERRORS and attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
time.sleep(delay)
else:
raise
# CRITICAL: Check for error events even at HTTP 200!
for event in stream:
if event.type == "error":
if event.error.type == "overloaded_error":
# Retry with backoff
pass
import httpx
# Proper timeout configuration
http_client = httpx.Client(
timeout=httpx.Timeout(
connect=10.0, # Connection timeout
read=120.0, # Read timeout (long for streaming!)
write=30.0, # Write timeout
pool=30.0 # Pool timeout
)
)
client = anthropic.Anthropic(http_client=http_client)
http_client = httpx.Client(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30.0
)
)
async def stream_to_ui(websocket, prompt):
"""Stream Claude response to WebSocket client"""
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
await websocket.send_json({
"type": "chunk",
"content": text
})
await websocket.send_json({
"type": "complete",
"usage": stream.get_final_message().usage
})
development
Enhanced plan-authoring skill with Pre-Writing context gathering, task metadata, non-TDD templates, Red Flags, telemetry, and an automated plan linter. Use when you have a spec or requirements for a multi-step task, before touching code.
tools
Documentation intelligence engine with graph-based API docs, algorithm library, and drift detection
tools
Ultraplan cloud planning — kick off a plan in the cloud from your terminal, review and revise in the browser, then execute remotely or send back to CLI
tools
--- name: mcp description: Configure MCP servers for Claude Code — stdio vs HTTP, authentication, Tools/Resources/Prompts distinction, channels (CI webhook, mobile relay, Discord bridge, fakechat), and cost of always-loaded tools. Use this skill whenever adding an MCP server, debugging connection issues, choosing between MCP Tools vs Prompts vs Resources, installing channel servers, or managing .mcp.json. Triggers on: "MCP server", "mcp config", "add Obsidian MCP", "install context7", "channels"