OpenAI Streaming Implementation Guide

OpenAI Streaming SSE Implementation Guide for Python and Node.js

Streaming allows you to receive tokens from the OpenAI API as they are generated, rather than waiting for the entire response to complete. This dramatically improves the user experience in chat applications by showing text appearing in real time. Streaming uses Server-Sent Events (SSE), a standard HTTP protocol for pushing data from server to client.

How OpenAI Streaming Works

When you set stream: true in your API request, the server keeps the HTTP connection open and sends chunks of data as they become available. Each chunk is a JSON object containing a delta (partial content). The stream ends with a [DONE] sentinel.

The response format for each chunk looks like this:

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":" world"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","object":"chat.completion.chunk","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Python Streaming Implementation

from openai import OpenAI

client = OpenAI(base_url="https://claude4u.com/v1")

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain how neural networks learn."}
    ],
    stream=True
)

collected_content = []
for chunk in stream:
    delta = chunk.choices[0].delta
    if delta.content:
        print(delta.content, end="", flush=True)
        collected_content.append(delta.content)

full_response = "".join(collected_content)
print()  # Newline after streaming completes

Python Async Streaming

For async applications (FastAPI, aiohttp), use the async client:

from openai import AsyncOpenAI

client = AsyncOpenAI(base_url="https://claude4u.com/v1")

async def stream_response(user_message: str):
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_message}],
        stream=True
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield delta.content

Node.js Streaming Implementation

import OpenAI from 'openai';

const client = new OpenAI({ baseURL: 'https://claude4u.com/v1' });

const stream = await client.chat.completions.create({
    model: 'gpt-4o',
    messages: [
        { role: 'system', content: 'You are a helpful assistant.' },
        { role: 'user', content: 'Explain how neural networks learn.' }
    ],
    stream: true
});

const chunks = [];
for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content;
    if (content) {
        process.stdout.write(content);
        chunks.push(content);
    }
}

const fullResponse = chunks.join('');
console.log(); // Newline after streaming

FastAPI Streaming Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI(base_url="https://claude4u.com/v1")

@app.post("/chat")
async def chat(message: str):
    def generate():
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": message}],
            stream=True
        )
        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                yield f"data: {content}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

Express.js Streaming Endpoint

import express from 'express';
import OpenAI from 'openai';

const app = express();
const client = new OpenAI({ baseURL: 'https://claude4u.com/v1' });

app.post('/chat', async (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');

    const stream = await client.chat.completions.create({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: req.body.message }],
        stream: true
    });

    for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content;
        if (content) {
            res.write(`data: ${JSON.stringify({ content })}\n\n`);
        }
    }
    res.write('data: [DONE]\n\n');
    res.end();
});

Handling Stream Errors

from openai import OpenAI, APIError

client = OpenAI(base_url="https://claude4u.com/v1")

try:
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello"}],
        stream=True
    )
    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            print(content, end="", flush=True)
except APIError as e:
    print(f"\nAPI error: {e.status_code} - {e.message}")
except Exception as e:
    print(f"\nStream interrupted: {e}")
Warning: When streaming, the usage field (token counts) is only available in the final chunk when you include stream_options={"include_usage": True} in your request. Without this option, you will not receive token counts for streaming responses.

Getting Usage Data from Streams

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
    stream=True,
    stream_options={"include_usage": True}
)

for chunk in stream:
    if chunk.usage:
        print(f"Tokens used: {chunk.usage.total_tokens}")
Tip: claude4u.com fully supports streaming for all models including OpenAI, Claude, and Gemini. The relay service maintains persistent connections and handles upstream reconnection automatically, providing more reliable streaming than direct API calls in regions with unstable network conditions.

Get Started with 轻舟 AI

Stable, fast AI API relay — supports Claude, OpenAI, Gemini and more

Sign Up Free