前端通过fetch模拟调用SSE接口的实现方案
作者:山山而川840
需求背景
团队要在系统集成AI大模型,需要实现类似于市面上的AI问答的聊天窗口。
技术调研
由于大模型接口是SSE接口,我们先了解一下SEE接口的特性。
SSE(Server-Sent Events)是一种基于HTTP的单向服务端推送技术,适用于实时数据更新场景(如新闻推送、股票行情)。以下是其核心特性:
1. 单向通信
- 方向:仅服务端→客户端单向推送,客户端无法通过SSE通道发送数据(需配合其他API如
fetch)。 - 对比WebSocket:WebSocket是双向通信,SSE更轻量且无需额外协议升级。
2. 基于HTTP协议
- 兼容性:直接复用HTTP协议,无需像WebSocket那样升级协议(
Upgrade: websocket)。 - 默认行为:
- 使用简单GET请求建立连接。
- 响应头需包含
Content-Type: text/event-stream。 - 自动处理连接重连(客户端默认重试机制)。
3. 事件流格式
- 数据格式:每条消息以
data:开头,以\n\n结尾,例如:data: {"time": "2025-09-02T00:00:00"}\n\n - 多字段支持:
event:自定义事件类型(如event: update\n)。id:消息ID,用于断线重连时定位。retry:指定重连延迟(毫秒)。
4. 自动重连
- 机制:连接中断后,浏览器自动尝试重新连接(默认间隔约3秒)。
- 控制:可通过
retry字段或监听onerror事件自定义重试逻辑。
5. 适用场景
- 推荐场景:实时通知、日志流、进度更新等服务端主导推送的需求。
- 不适用场景:需要客户端频繁交互(如在线游戏、聊天室)。
6. 限制
- 协议限制:仅支持文本数据(二进制需编码为Base64)。
- 浏览器限制:
- 最大并发连接数(同一域名下通常6个,HTTP/2可复用)。
- 部分老旧浏览器(如IE)不支持。
现有方案
针对SSE接口浏览器其实给我提供了一个简单高效的api(EventSource),他是一个构造函数,其实例会对HTTP服务器开启一个持久化的连接,以 text/event-stream 格式发送事件,此连接会一直保持开启直到通过调用 EventSource.close()关闭,使用示例:
const eventSource = new EventSource('你的SSE接口地址');
eventSource.onmessage = (event) => {
console.log('接收到数据:', event.data);
// 对响应数据进行业务处理
...业务代码
};
eventSource.onerror = (error) => {
console.error('SSE连接错误:', error);
// 可选:重连逻辑
};上面这种方案简单高效,但是他只能发送get请求,并且请求参数不能手动直接添加到URL上,必须通过手动构造URL字符串来实现。
const params = new URLSearchParams({ key1: 'value1', key2: 'value2' });
const url = `/api/v1/sse?${params.toString()}`;
const eventSource = new EventSource(url);如果想了解更多有关 EventSource Web api 信息的,可以研读下面这篇文档,EventSource:https://developer.mozilla.org/zh-CN/docs/Web/API/EventSource
问题来了
由于系统需要集成的SSE接口是post请求,所以上面的方案不适用,经过调研有两种实现方案:
1.使用fetch+ReadableStream实现 SSE
由于fetch 可以通过流式响应逐步读取数据,模拟SSE的持续特性。(强烈推荐)
async function fetchSSE(url, options = {}) {
const response = await fetch(url, {
...options,
// 请求头可根据接口文档自行调整
headers: {
'Accept': 'text/event-stream', // 声明需要SSE格式
...options.headers,
},
});
if (!response.ok) Promise.reject(new Error(response.status));
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 解析SSE格式的数据(如 "data: ...\n\n")
chunk.split('\n\n').forEach(event => {
if (event.trim()) {
const data = event.replace(/^data: /, '').trim();
// 持续解析数据进行业务处理
...业务代码
}
});
}
}
// 调用示例
fetchSSE('url', {
method: 'POST',
headers: { 'Authorization': 'Bearer xxx' },
body: JSON.stringify({ key: 'value' }),
}).catch(error => {
// 错误处理
})2. 使用XMLHttpRequest(传统Ajax)模拟SSE
这种方式实现需要后端分块传输,然后去监听progress事件,无法真正实现流式解析,不推荐使用,案例代码:
function xhrSSE(url) {
const xhr = new XMLHttpRequest();
xhr.open('GET', url, true);
xhr.setRequestHeader('Accept', 'text/event-stream');
xhr.onprogress = function() {
// 增量获取响应文本(需后端支持分块传输)
const newData = xhr.responseText.substring(lastIndex);
lastIndex = xhr.responseText.length;
// 业务处理
...业务代码
};
xhr.send();
}总结
上面三种方式都能实现SSE接口的调用和响应解析,如果是get请求推荐使用浏览器提供的api(EventSource),如果是post请求则推荐使用fetch;开发过程中遇到的业务场景,记录一下,希望能够帮助到遇到同样问题的小伙伴。
附:fetch 模拟 sse 请求方式封装
// streamUtils.ts
/**
* SSE 请求配置选项
* @template T - 期望解析的数据类型 (默认为 string)
*/
type SSEOptions<T> = {
/**
* 请求目标 URL (必需)
* @example "/api/chat-stream"
*/
url: string;
/**
* fetch 请求初始化配置
* @default { method: 'GET' }
* @example {
* method: "POST",
* headers: { "Authorization": "Bearer token" },
* body: JSON.stringify({ prompt: "Hello" })
* }
*/
requestInit?: RequestInit;
/**
* 自定义事件块解析器
* @param eventChunk - 原始事件字符串 (包含 "data:" 等前缀)
* @returns 解析后的数据对象或 null (表示无效数据)
* @default defaultParser
*/
parser?: (eventChunk: string) => T | null;
/**
* 数据到达回调 (必需)
* @param data - 解析后的数据对象
*/
onOpen: (data: String) => void;
onData: (data: T) => void;
/**
* 错误处理回调
* @param error - 遇到的错误对象
*/
onError?: (error: Error) => void;
/**
* 流接收完成回调
*/
onComplete?: () => void;
};
/**
* SSE 流处理核心方法
* @template T - 期望解析的数据类型
* @param options - 配置选项
* @returns 包含中止方法的对象 { abort: () => void }
*
* @example 基本使用
* const { abort } = fetchStream({
* url: "/api/stream",
* onData: data => console.log(data),
* onError: err => console.error(err)
* });
*
* @example 带自定义解析器
* fetchStream({
* parser: chunk => ({ msg: chunk.trim() }),
* // ...其他配置
* });
*/
export function fetchStream<T = string>(options: SSEOptions<T>) {
// 创建中止控制器用于中断请求
const controller = new AbortController();
// 文本解码器用于处理二进制流
const decoder = new TextDecoder();
// 保存不完整的事件块 (跨 chunk 的场景)
let partialChunk = '';
// 解构配置参数并设置默认值
const { parser = defaultParser, onOpen, onData, onError, onComplete } = options;
onOpen && onOpen('会话开始')
/**
* 处理流数据的内部方法
* @param response - fetch 返回的响应对象
*/
async function handleStream(response: Response) {
try {
// 获取可读流读取器
const reader = response.body?.getReader();
if (!reader) throw new Error('Failed to get stream reader');
// 持续读取数据流
while (true) {
const { done, value } = await reader.read();
if (done) break; // 流读取结束
// 解码当前 chunk 并拼接之前未完成的数据
const chunk = decoder.decode(value, { stream: true });
// 使用通用行结束符分割事件块 (兼容不同系统)
// 注意:SSE 规范要求用 \n\n 分割,但某些服务可能使用 \r\n\r\n
const events = (partialChunk + chunk).split(/\r\n\r\n|\n\n/);
// 保存未完成的事件块供下次处理
partialChunk = events.pop() || '';
// 处理每个完整的事件块
for (const eventChunk of events) {
const data = parser(eventChunk);
if (data !== null) {
onData(data); // 触发数据回调
}
}
}
// 处理剩余数据 (最后一个事件块)
if (partialChunk) {
const data = parser(partialChunk);
if (data !== null) onData(data);
}
onComplete?.(); // 触发完成回调
} catch (error) {
// 忽略主动中断产生的错误
if (error instanceof Error && error.name !== 'AbortError') {
onError?.(error);
}
}
}
// 发起 fetch 请求
fetch(options.url, {
...options.requestInit, // 用户自定义配置
signal: controller.signal, // 绑定中止信号
headers: {
Accept: 'text/event-stream', // 确保接收 SSE 流
...options.requestInit?.headers, // 合并用户自定义 headers
},
})
.then(handleStream)
.catch((error) => {
// 处理初始请求错误 (如网络问题)
if (error instanceof Error && error.name !== 'AbortError') {
onError?.(error);
}
});
// 返回中止方法供外部调用
return {
/** 中止当前请求 */
abort: () => controller.abort(),
};
}
/**
* 默认 SSE 解析器
* @param eventChunk - 原始事件字符串
* @returns 解析后的数据对象或 null
*
* @example 输入示例
* "data: Hello\ndata: World\n\n"
*
* @example 输出结果
* "Hello\nWorld" (自动合并多个 data 行)
*/
function defaultParser<T = string>(eventChunk: string): T | null {
try {
let content = '';
// 逐行处理事件内容
for (const line of eventChunk.split('\n')) {
// 仅处理 data 字段 (忽略 event/id/retry 等)
if (line.startsWith('data:')) {
content += line.slice(5).trim() + '\n'; // 保留换行结构
}
}
content = content.trim(); // 去除首尾空白
// 空内容返回 null
if (!content) return null;
// 尝试解析为 JSON,失败则返回原始字符串
try {
return JSON.parse(content) as T;
} catch {
return content as unknown as T;
}
} catch (e) {
console.error('SSE parsing error:', e);
return null;
}
}
调用方法
let abortController: (() => void) | null = null; // 中止控制器
// 发起SSE请求
const { abort } = fetchStream({
url: `/ai/chat/memory`,
requestInit: {
method: "POST",
body: formData
},
onOpen(data) {
// 开始
},
onData(data) {
// 有返回数据了
},
onError(err) {
console.log("会话失败");
error.value = `请求失败: ${err.message}`;
},
onComplete() {
console.log("会话结束");
},
});
abortController = abort; // 保存中止函数
};// 组件卸载时中止请求
onUnmounted(() => {
abortController?.();
});到此这篇关于前端通过fetch模拟调用SSE接口的文章就介绍到这了,更多相关前端模拟调用SSE接口内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
