
一套代码接入 40 AI 模型统一处理不同厂商的 SSE 流式输出格式。为什么需要统一 SSE 处理调用过多个 AI 模型 API 的前端开发者都会遇到一个头疼的问题各家厂商的 SSEServer-Sent Events返回格式不一样。同样是流式对话不同厂商的数据块长得完全不同OpenAI 格式ChatGPT / DeepSeekdata:{id:chatcmpl-xxx,object:chat.completion.chunk,choices:[{delta:{content:你好}}]}Anthropic 格式Claudeevent: message_start data: {type:message_start,message:{content:[]}} event: content_block_delta data: {type:content_block_delta,delta:{type:text_delta,text:你好}}通义千问格式data:{output:{choices:[{message:{content:你好}}]}}三种格式三种字段路径。如果你的应用需要同时支持多个模型按传统方式每个模型写一套解析逻辑代码会迅速膨胀成灾难。更棘手的是SSE 协议本身还有一些坑TCP 分片一个完整的data: {...}\n\n可能被 TCP 拆成两次传输前端收到的chunk可能是一个 JSON 的一半多行 data部分厂商的一条消息中有多个data:行空行和注释\n和: heartbeat\n需要过滤架构设计统一的 SSE 管道核心思路将不同厂商的 SSE 响应通过适配器模式统一转换为前端组件可消费的标准流。┌──────────────────────────────────────────────────┐ │ Vue 组件 │ │ streamingText, isStreaming, error │ └────────────────────┬─────────────────────────────┘ │ ┌────────────────────▼─────────────────────────────┐ │ 统一流式响应处理器 │ │ 文本累积 / 错误处理 / 中断控制 / 重连机制 │ └────────────────────┬─────────────────────────────┘ │ ┌────────────────────▼─────────────────────────────┐ │ SSE 解析器 │ │ 字节流 → 行缓冲 → 事件解析 → 标准事件对象 │ └────────────────────┬─────────────────────────────┘ │ ┌────────────┼────────────┐ ▼ ▼ ▼ OpenAI适配器 Anthropic适配器 通义千问适配器 (choices[0] (content_block (output.choices .delta _delta.text) [0].message .content) .content)第一步底层 SSE 解析器先从最底层的字节流解析开始。fetchAPI 的ReadableStream是处理 SSE 的最佳选择相比EventSource它可以支持 POST 请求携带请求体自定义请求头Authorization 等手动控制取消和超时/** * SSE 事件解析器将原始字节流解析为标准事件对象 * 处理 TCP 分片、多行 data、注释行等边界情况 */classSSEParser{privatebufferprivatedecodernewTextDecoder()/** 从 ReadableStream 读取并逐行解析返回异步迭代器 */async*parse(body:ReadableStreamUint8Array):AsyncGeneratorSSEEvent{constreaderbody.getReader()try{while(true){const{done,value}awaitreader.read()if(done){// 流结束处理缓冲区剩余内容if(this.buffer.trim()){consteventthis.parseBuffer()if(event)yieldevent}break}// 解码并追加到行缓冲区this.bufferthis.decoder.decode(value,{stream:true})// 按行分割处理yield*this.processLines()}}finally{reader.releaseLock()}}/** 处理缓冲区中的完整行 */private*processLines():GeneratorSSEEvent{while(true){constidxthis.buffer.indexOf(\n)if(idx-1)break// 没有完整行等待更多数据constlinethis.buffer.slice(0,idx).trimEnd()this.bufferthis.buffer.slice(idx1)// 跳过空行和注释if(!line||line.startsWith(:))continue// 解析 field: value 格式constcolonIdxline.indexOf(:)if(colonIdx-1)continueconstfieldline.slice(0,colonIdx)letvalueline.slice(colonIdx1)if(value.startsWith( ))valuevalue.slice(1)if(fielddata){// 累积 data 行直到遇到空行letdatavaluewhile(true){constnextIdxthis.buffer.indexOf(\n)if(nextIdx-1)breakconstnextLinethis.buffer.slice(0,nextIdx).trimEnd()if(nextLine||nextLine.startsWith(:)){// 空行或注释表示事件结束this.bufferthis.buffer.slice(nextIdx1)yield{data}break}this.bufferthis.buffer.slice(nextIdx1)if(nextLine.startsWith(data:)){data\nnextLine.slice(5).replace(/^/,)}}}}}/** 处理缓冲区中不完整的最终内容 */privateparseBuffer():SSEEvent|null{constlinethis.buffer.trim()if(line.startsWith(data:)){return{data:line.slice(5).replace(/^/,)}}returnnull}}interfaceSSEEvent{data:string}第二步模型适配器每种模型只需要一个轻量级适配器负责从原始 JSON 提取文本内容/** 适配器接口从原始 JSON 提取文本增量 */interfaceModelAdapter{/** 从 SSE 事件 data 中提取文本片段返回 null 表示非内容事件如心跳 */extractDelta(rawData:string):string|null}/** OpenAI 兼容格式DeepSeek、豆包、ChatGLM 等均兼容 */constopenaiAdapter:ModelAdapter{extractDelta(raw:string){try{constparsedJSON.parse(raw)returnparsed?.choices?.[0]?.delta?.content??null}catch{returnnull}}}/** Anthropic 格式Claude 系列 */constanthropicAdapter:ModelAdapter{extractDelta(raw:string){try{constparsedJSON.parse(raw)if(parsed?.typecontent_block_delta){returnparsed?.delta?.text??null}returnnull}catch{returnnull}}}/** 通义千问格式 */constqwenAdapter:ModelAdapter{extractDelta(raw:string){try{constparsedJSON.parse(raw)returnparsed?.output?.choices?.[0]?.message?.content??null}catch{returnnull}}}/** 适配器注册表 */constadapterRegistry:Recordstring,ModelAdapter{openai:openaiAdapter,anthropic:anthropicAdapter,qwen:qwenAdapter,}第三步统一流式响应处理器将解析器和适配器组合提供上层组件直接使用的 APIinterfaceStreamOptions{url:stringbody:Recordstring,anyadapter:keyoftypeofadapterRegistry signal?:AbortSignalonToken:(text:string)voidonDone:(fullText:string)voidonError:(error:Error)void}/** * 发起 SSE 流式请求自动解析并回调文本增量 * * 使用示例 * streamChat({ * url: /ai/v1/chat/completions, * body: { model: deepseek-v3, messages: [...] }, * adapter: openai, * onToken: (text) { fullText.value text }, * onDone: (text) { loading.value false }, * onError: (err) { error.value err.message }, * }) */asyncfunctionstreamChat(options:StreamOptions){const{url,body,adapter:adapterName,signal,onToken,onDone,onError}optionsconstmodelAdapteradapterRegistry[adapterName]if(!modelAdapter){onError(newError(未知的模型适配器${adapterName}))return}constcontrollernewAbortController()constmergedSignalsignal?combineSignals(signal,controller.signal):controller.signaltry{constresponseawaitfetch(url,{method:POST,headers:{Content-Type:application/json},body:JSON.stringify({...body,stream:true}),signal:mergedSignal,// 携带 HttpOnly Cookie 用于鉴权防御 XSS 窃取 tokencredentials:include,})if(!response.ok){consterrorBodyawaitresponse.text().catch(())thrownewError(请求失败 (${response.status}):${errorBody})}constparsernewSSEParser()letfullTextforawait(consteventofparser.parse(response.body!)){if(event.data[DONE])breakconstdeltamodelAdapter.extractDelta(event.data)if(delta!null){fullTextdeltaonToken(delta)}}onDone(fullText)}catch(err:any){if(err.nameAbortError)returnonError(errinstanceofError?err:newError(String(err)))}}/** 合并两个 AbortSignal */functioncombineSignals(a:AbortSignal,b:AbortSignal):AbortSignal{constcontrollernewAbortController()constonAbort()controller.abort()a.addEventListener(abort,onAbort)b.addEventListener(abort,onAbort)if(a.aborted||b.aborted)controller.abort()returncontroller.signal}第四步Vue 3 Composable封装为 Vue 组合式函数组件中使用起来极其简洁// composables/useAiChat.tsimport{ref}fromvueexportfunctionuseAiChat(){conststreamingTextref()constisStreamingref(false)consterrorrefstring|null(null)letabortController:AbortController|nullnullasyncfunctionsend(model:string,adapter:keyoftypeofadapterRegistry,messages:Array{role:string;content:string}){// 中断上一个请求abortController?.abort()abortControllernewAbortController()streamingText.valueisStreaming.valuetrueerror.valuenullawaitstreamChat({url:/ai/v1/chat/completions,body:{model,messages},adapter,signal:abortController.signal,onToken:(text){streamingText.valuetext},onDone:(){isStreaming.valuefalse},onError:(err){error.valueerr.message isStreaming.valuefalse},})}functionstop(){abortController?.abort()isStreaming.valuefalse}return{streamingText,isStreaming,error,send,stop}}组件中使用script setup langts import { useAiChat } from /composables/useAiChat const { streamingText, isStreaming, error, send, stop } useAiChat() const handleSend (message: string) { // 切换模型只需改 adapter 参数无需修改任何解析逻辑 send(deepseek-v3, openai, [{ role: user, content: message }]) } /script template div classchat-output v-textstreamingText / button v-ifisStreaming clickstop停止生成/button div v-iferror classerror{{ error }}/div /template工程化要点1. 错误重试策略网络抖动导致 SSE 连接断开时自动重试需要配合后端 API 的幂等性。建议在前端做指数退避asyncfunctionstreamWithRetry(options:StreamOptions,maxRetries3){for(leti0;imaxRetries;i){try{awaitstreamChat(options)return// 成功}catch(err){if(imaxRetries-1){options.onError(errasError)return}// 指数退避: 1s, 2s, 4sawaitnewPromise(rsetTimeout(r,Math.pow(2,i)*1000))}}}2. 内存控制流式响应可能产生大量文本如生成一篇长文章fullText字符串持续拼接会产生 GC 压力。如果只需要实时展示且不保留完整历史可以考虑只保留onToken回调不在内部累积fullText。3. 组件卸载时的清理import{onUnmounted}fromvueonUnmounted((){stop()// 组件销毁时自动中断正在进行的 SSE 连接})4. 多模型并排对比有了统一的 SSE 管道实现同一问题同时发送给多个模型并排展示结果变得非常简单// 同时向 DeepSeek、通义千问、ChatGLM 发起请求Promise.all([send(deepseek-v3,openai,messages),send(qwen3-max,qwen,messages),send(chatglm-4,openai,messages),// ChatGLM 兼容 OpenAI 格式])每个模型的流式输出独立走各自管道互不干扰。为什么选择统一的 API 网关上述架构虽然优雅但前提是——你使用的 API 平台已经处理好了模型协议的差异。如果用星枢无极这样的统一 AI API 网关前端开发者完全不需要关心底层模型是 OpenAI 格式还是 Anthropic 格式。平台在后端完成协议转换所有模型统一输出 OpenAI 兼容的 SSE 流式格式。这意味着你的前端代码可以简化到极致// 所有 40 模型统一使用 OpenAI 兼容格式awaitstreamChat({url:/ai/v1/chat/completions,body:{model:qwen3-max,// 通义千问// model: deepseek-v3, // DeepSeek// model: claude-opus, // Claudemessages:[...],},adapter:openai,// ← 永远只用这一个适配器})切换模型只需改一个model字段无需修改任何解析代码也无需关心各厂商 SDK 的版本兼容性。总结方案适配器数量切换模型成本维护负担逐模型接入N 个改代码 改解析逻辑极高统一 SSE 管道N 个适配器改 adapter 参数中API 网关 统一管道1 个openai改 model 字段极低核心结论流式响应的本质是字节流 → 结构化事件 → 文本增量的转换链。通过 SSEParser字节流解析 ModelAdapter模型格式适配 streamChat统一 API你可以将这个链路抽象为一次配置、处处复用的前端基础设施。配合 API 网关后端的协议统一前端代码可以做到模型无关——这是规模化接入多模型最优雅的方式。