Claude Streaming API Guide
Claude Streaming API Guide: SSE Implementation for Real-Time Responses
Streaming lets your application display Claude's response token-by-token as it is generated, rather than waiting for the entire response. This dramatically improves perceived performance and user experience. This guide covers Server-Sent Events (SSE) implementation with the Claude API in Python, Node.js, and raw HTTP.
Why Use Streaming?
- Faster perceived response — Users see the first token in under a second instead of waiting 5-30 seconds for a complete response.
- Better UX — Real-time output creates a natural, conversational feel similar to watching someone type.
- Memory efficiency — Process tokens incrementally instead of buffering the entire response in memory.
- Early cancellation — Users can stop generation early if the response is going in the wrong direction, saving tokens and cost.
How Claude Streaming Works
Claude uses Server-Sent Events (SSE) for streaming. When you set "stream": true in your request, the API responds with a series of events rather than a single JSON object. Each event contains a chunk of the response:
event: message_start
data: {"type":"message_start","message":{"id":"msg_...","type":"message","role":"assistant",...}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":12}}
event: message_stop
data: {"type":"message_stop"}
Python Implementation
import anthropic
client = anthropic.Anthropic() # Reads ANTHROPIC_BASE_URL and API key from env
# Basic streaming
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a short poem about APIs"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # Newline at end
# Get the final message with usage info
final_message = stream.get_final_message()
print(f"Tokens used: {final_message.usage.input_tokens} in, {final_message.usage.output_tokens} out")
Async Python Streaming
import anthropic
import asyncio
async def stream_response():
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain streaming APIs"}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
asyncio.run(stream_response())
Node.js / TypeScript Implementation
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
async function streamResponse() {
const stream = await client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: 'Explain streaming APIs' }],
});
for await (const event of stream) {
if (
event.type === 'content_block_delta' &&
event.delta.type === 'text_delta'
) {
process.stdout.write(event.delta.text);
}
}
const finalMessage = await stream.finalMessage();
console.log('\nTokens:', finalMessage.usage);
}
streamResponse();
Raw HTTP with curl
curl https://claude4u.com/antigravity/v1/messages \
-H "x-api-key: $ANTHROPIC_API_KEY" \
-H "content-type: application/json" \
-H "anthropic-version: 2023-06-01" \
--no-buffer \
-d '{
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"stream": true,
"messages": [{"role": "user", "content": "Hello, Claude!"}]
}'
ANTHROPIC_BASE_URL and your relay API key.
Building a Streaming Web Endpoint
Here is how to proxy Claude's streaming response through your own Express.js server:
import Anthropic from '@anthropic-ai/sdk';
import express from 'express';
const app = express();
const client = new Anthropic();
app.post('/api/chat', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = await client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: req.body.messages,
});
for await (const event of stream) {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
res.write('data: [DONE]\n\n');
res.end();
});
app.listen(3000);
Handling Stream Interruptions
Network issues can interrupt streaming connections. Implement proper cleanup:
# Python: Use AbortController pattern
import signal
def handle_interrupt(signum, frame):
print("\nStream cancelled by user")
raise KeyboardInterrupt
signal.signal(signal.SIGINT, handle_interrupt)
Streaming Best Practices
- Always handle all event types — Do not assume only
content_block_deltaevents exist. Handlemessage_start,message_delta, and error events. - Capture usage data — Token usage is reported in the
message_deltaevent at the end of the stream. Do not discard this for cost tracking. - Implement timeouts — Set connection timeouts to detect stalled streams.
- Buffer management — Flush output buffers to prevent batching of streamed tokens.
- Use a relay service — Services like claude4u.com optimize streaming connections and handle reconnection logic.
Get Started with 轻舟 AI
Stable, fast AI API relay — supports Claude, OpenAI, Gemini and more
Sign Up Free
轻舟 AI