OpenAI 流式输出教程
OpenAI 流式输出(Streaming)让你的应用实时展示 AI 生成的内容,显著提升用户体验。本文详解 SSE 协议原理、Python 和 Node.js 实现,以及前端渲染流式数据的方法。
为什么使用流式输出?
- 更快的首字节时间:无需等待完整生成即可开始展示
- 更好的用户体验:逐字显示效果类似 ChatGPT 界面
- 支持长文本:长回复不会因超时而失败
- 提前中断:用户不满意时可随时停止生成
SSE 协议格式
OpenAI 流式响应基于 Server-Sent Events,每个事件格式为:
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"你"},"finish_reason":null}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"好"},"finish_reason":null}]}
data: [DONE]
Python 流式实现
from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://claude4u.com/v1"
)
# 基础流式调用
stream = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "详细介绍 Python 的装饰器机制"}
],
stream=True
)
collected_content = []
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
collected_content.append(delta.content)
full_response = "".join(collected_content)
print(f"\n\n完整响应长度: {len(full_response)} 字符")
异步流式调用
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://claude4u.com/v1"
)
async def stream_chat(prompt):
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(stream_chat("什么是微服务架构?"))
Node.js 流式实现
import OpenAI from 'openai';
const client = new OpenAI({
apiKey: 'your-api-key',
baseURL: 'https://claude4u.com/v1'
});
const stream = await client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: '用 TypeScript 写一个简单的 HTTP 服务器' }],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}
前端渲染流式数据
使用 fetch + ReadableStream
async function streamChat(message) {
const response = await fetch('https://claude4u.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-api-key'
},
body: JSON.stringify({
model: 'gpt-4o',
messages: [{ role: 'user', content: message }],
stream: true
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
const outputEl = document.getElementById('output');
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n').filter(line => line.startsWith('data: '));
for (const line of lines) {
const data = line.slice(6); // 去掉 "data: "
if (data === '[DONE]') return;
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
outputEl.textContent += content;
}
}
}
}
使用 EventSource(简化版)
// 注意:EventSource 仅支持 GET 请求,通常需要后端中转
const eventSource = new EventSource('/api/stream?prompt=Hello');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
const content = data.choices[0]?.delta?.content;
if (content) {
document.getElementById('output').textContent += content;
}
};
流式错误处理
from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://claude4u.com/v1"
)
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
except Exception as e:
print(f"\n流式传输错误: {e}")
# 可以回退到非流式模式
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=False
)
print(response.choices[0].message.content)
提示:通过 claude4u.com 轻舟 AI 中转服务使用流式输出,内置连接保活和自动重连机制,即使上游服务短暂抖动也能保证流式传输的稳定性。
注意:如果你的应用使用 Nginx 反向代理,需要添加
proxy_buffering off; 配置,否则 Nginx 会缓冲流式响应,导致内容不会逐步展示而是一次性返回。
轻舟 AI