Claude Streaming API Guide

Claude Streaming API Guide: SSE Implementation for Real-Time Responses

Streaming lets your application display Claude's response token-by-token as it is generated, rather than waiting for the entire response. This dramatically improves perceived performance and user experience. This guide covers Server-Sent Events (SSE) implementation with the Claude API in Python, Node.js, and raw HTTP.

Why Use Streaming?

How Claude Streaming Works

Claude uses Server-Sent Events (SSE) for streaming. When you set "stream": true in your request, the API responds with a series of events rather than a single JSON object. Each event contains a chunk of the response:

event: message_start
data: {"type":"message_start","message":{"id":"msg_...","type":"message","role":"assistant",...}}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":12}}

event: message_stop
data: {"type":"message_stop"}

Python Implementation

import anthropic

client = anthropic.Anthropic()  # Reads ANTHROPIC_BASE_URL and API key from env

# Basic streaming
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a short poem about APIs"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
print()  # Newline at end

# Get the final message with usage info
final_message = stream.get_final_message()
print(f"Tokens used: {final_message.usage.input_tokens} in, {final_message.usage.output_tokens} out")

Async Python Streaming

import anthropic
import asyncio

async def stream_response():
    client = anthropic.AsyncAnthropic()

    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Explain streaming APIs"}]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

asyncio.run(stream_response())

Node.js / TypeScript Implementation

import Anthropic from '@anthropic-ai/sdk';

const client = new Anthropic();

async function streamResponse() {
  const stream = await client.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: [{ role: 'user', content: 'Explain streaming APIs' }],
  });

  for await (const event of stream) {
    if (
      event.type === 'content_block_delta' &&
      event.delta.type === 'text_delta'
    ) {
      process.stdout.write(event.delta.text);
    }
  }

  const finalMessage = await stream.finalMessage();
  console.log('\nTokens:', finalMessage.usage);
}

streamResponse();

Raw HTTP with curl

curl https://claude4u.com/antigravity/v1/messages \
  -H "x-api-key: $ANTHROPIC_API_KEY" \
  -H "content-type: application/json" \
  -H "anthropic-version: 2023-06-01" \
  --no-buffer \
  -d '{
    "model": "claude-sonnet-4-20250514",
    "max_tokens": 1024,
    "stream": true,
    "messages": [{"role": "user", "content": "Hello, Claude!"}]
  }'
Tip: When using a relay service like claude4u.com, streaming works identically to the direct API. The relay transparently proxies SSE events with minimal added latency. Just set ANTHROPIC_BASE_URL and your relay API key.

Building a Streaming Web Endpoint

Here is how to proxy Claude's streaming response through your own Express.js server:

import Anthropic from '@anthropic-ai/sdk';
import express from 'express';

const app = express();
const client = new Anthropic();

app.post('/api/chat', async (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  const stream = await client.messages.stream({
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024,
    messages: req.body.messages,
  });

  for await (const event of stream) {
    res.write(`data: ${JSON.stringify(event)}\n\n`);
  }

  res.write('data: [DONE]\n\n');
  res.end();
});

app.listen(3000);

Handling Stream Interruptions

Network issues can interrupt streaming connections. Implement proper cleanup:

# Python: Use AbortController pattern
import signal

def handle_interrupt(signum, frame):
    print("\nStream cancelled by user")
    raise KeyboardInterrupt

signal.signal(signal.SIGINT, handle_interrupt)
Warning: Always handle client disconnections in server-side streaming implementations. If a client disconnects mid-stream, you must abort the upstream API request to avoid wasting tokens. Use AbortController or request cancellation mechanisms.

Streaming Best Practices

  1. Always handle all event types — Do not assume only content_block_delta events exist. Handle message_start, message_delta, and error events.
  2. Capture usage data — Token usage is reported in the message_delta event at the end of the stream. Do not discard this for cost tracking.
  3. Implement timeouts — Set connection timeouts to detect stalled streams.
  4. Buffer management — Flush output buffers to prevent batching of streamed tokens.
  5. Use a relay service — Services like claude4u.com optimize streaming connections and handle reconnection logic.

Get Started with 轻舟 AI

Stable, fast AI API relay — supports Claude, OpenAI, Gemini and more

Sign Up Free