javascript技巧

关注公众号 jb51net

关闭
首页 > 网络编程 > JavaScript > javascript技巧 > 前端模拟调用SSE接口

前端通过fetch模拟调用SSE接口的实现方案

作者:山山而川840

SSE即Server-Sent Events,是一种服务器向浏览器推送数据的技术,并且不断地发送数据,保持连接打开,这篇文章主要介绍了前端通过fetch模拟调用SSE接口的实现方案,需要的朋友可以参考下

需求背景

团队要在系统集成AI大模型,需要实现类似于市面上的AI问答的聊天窗口。

技术调研

由于大模型接口是SSE接口,我们先了解一下SEE接口的特性。

SSE(Server-Sent Events)是一种基于HTTP的单向服务端推送技术,适用于实时数据更新场景(如新闻推送、股票行情)。以下是其核心特性:

1. 单向通信

2. 基于HTTP协议

3. 事件流格式

4. 自动重连

5. 适用场景

6. 限制

现有方案

针对SSE接口浏览器其实给我提供了一个简单高效的api(EventSource),他是一个构造函数,其实例会对HTTP服务器开启一个持久化的连接,以 text/event-stream 格式发送事件,此连接会一直保持开启直到通过调用 EventSource.close()关闭,使用示例:

const eventSource = new EventSource('你的SSE接口地址');

eventSource.onmessage = (event) => {
  console.log('接收到数据:', event.data);
  // 对响应数据进行业务处理
    ...业务代码
};

eventSource.onerror = (error) => {
  console.error('SSE连接错误:', error);
  // 可选:重连逻辑
};

上面这种方案简单高效,但是他只能发送get请求,并且请求参数不能手动直接添加到URL上,必须通过手动构造URL字符串来实现。

const params = new URLSearchParams({ key1: 'value1', key2: 'value2' });
const url = `/api/v1/sse?${params.toString()}`;
const eventSource = new EventSource(url);

如果想了解更多有关 EventSource Web api 信息的,可以研读下面这篇文档EventSource:https://developer.mozilla.org/zh-CN/docs/Web/API/EventSource

问题来了

由于系统需要集成的SSE接口是post请求,所以上面的方案不适用,经过调研有两种实现方案:

1.使用fetch+ReadableStream实现 SSE

由于fetch 可以通过流式响应逐步读取数据,模拟SSE的持续特性。(强烈推荐)

async function fetchSSE(url, options = {}) {
  const response = await fetch(url, {
    ...options,
    // 请求头可根据接口文档自行调整
    headers: {
      'Accept': 'text/event-stream', // 声明需要SSE格式
      ...options.headers,
    },
  });

  if (!response.ok) Promise.reject(new Error(response.status));

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const chunk = decoder.decode(value);
    // 解析SSE格式的数据(如 "data: ...\n\n")
    chunk.split('\n\n').forEach(event => {
      if (event.trim()) {
        const data = event.replace(/^data: /, '').trim();
        // 持续解析数据进行业务处理
        ...业务代码
      }
    });
  }
}

// 调用示例
fetchSSE('url', {
  method: 'POST',
  headers: { 'Authorization': 'Bearer xxx' },
  body: JSON.stringify({ key: 'value' }),
}).catch(error => {
    // 错误处理
})

2. 使用XMLHttpRequest(传统Ajax)模拟SSE

这种方式实现需要后端分块传输,然后去监听progress事件,无法真正实现流式解析,不推荐使用,案例代码:

function xhrSSE(url) {
  const xhr = new XMLHttpRequest();
  xhr.open('GET', url, true);
  xhr.setRequestHeader('Accept', 'text/event-stream');
  xhr.onprogress = function() {
    // 增量获取响应文本(需后端支持分块传输)
    const newData = xhr.responseText.substring(lastIndex);
    lastIndex = xhr.responseText.length;
    // 业务处理
    ...业务代码
  };
  xhr.send();
}

总结

上面三种方式都能实现SSE接口的调用和响应解析,如果是get请求推荐使用浏览器提供的api(EventSource),如果是post请求则推荐使用fetch;开发过程中遇到的业务场景,记录一下,希望能够帮助到遇到同样问题的小伙伴。

附:fetch 模拟 sse 请求方式封装

// streamUtils.ts

/**
 * SSE 请求配置选项
 * @template T - 期望解析的数据类型 (默认为 string)
 */
type SSEOptions<T> = {
  /**
   * 请求目标 URL (必需)
   * @example "/api/chat-stream"
   */
  url: string;

  /**
   * fetch 请求初始化配置
   * @default { method: 'GET' }
   * @example {
   *   method: "POST",
   *   headers: { "Authorization": "Bearer token" },
   *   body: JSON.stringify({ prompt: "Hello" })
   * }
   */
  requestInit?: RequestInit;

  /**
   * 自定义事件块解析器
   * @param eventChunk - 原始事件字符串 (包含 "data:" 等前缀)
   * @returns 解析后的数据对象或 null (表示无效数据)
   * @default defaultParser
   */
  parser?: (eventChunk: string) => T | null;

  /**
   * 数据到达回调 (必需)
   * @param data - 解析后的数据对象
   */
  onOpen: (data: String) => void;
  onData: (data: T) => void;

  /**
   * 错误处理回调
   * @param error - 遇到的错误对象
   */
  onError?: (error: Error) => void;

  /**
   * 流接收完成回调
   */
  onComplete?: () => void;
};

/**
 * SSE 流处理核心方法
 * @template T - 期望解析的数据类型
 * @param options - 配置选项
 * @returns 包含中止方法的对象 { abort: () => void }
 *
 * @example 基本使用
 * const { abort } = fetchStream({
 *   url: "/api/stream",
 *   onData: data => console.log(data),
 *   onError: err => console.error(err)
 * });
 *
 * @example 带自定义解析器
 * fetchStream({
 *   parser: chunk => ({ msg: chunk.trim() }),
 *   // ...其他配置
 * });
 */
export function fetchStream<T = string>(options: SSEOptions<T>) {
  // 创建中止控制器用于中断请求
  const controller = new AbortController();

  // 文本解码器用于处理二进制流
  const decoder = new TextDecoder();

  // 保存不完整的事件块 (跨 chunk 的场景)
  let partialChunk = '';

  // 解构配置参数并设置默认值
  const { parser = defaultParser, onOpen, onData, onError, onComplete } = options;
  onOpen && onOpen('会话开始')
  /**
   * 处理流数据的内部方法
   * @param response - fetch 返回的响应对象
   */
  async function handleStream(response: Response) {
    try {
      // 获取可读流读取器
      const reader = response.body?.getReader();
      if (!reader) throw new Error('Failed to get stream reader');

      // 持续读取数据流
      while (true) {
        const { done, value } = await reader.read();
        if (done) break; // 流读取结束

        // 解码当前 chunk 并拼接之前未完成的数据
        const chunk = decoder.decode(value, { stream: true });

        // 使用通用行结束符分割事件块 (兼容不同系统)
        // 注意:SSE 规范要求用 \n\n 分割,但某些服务可能使用 \r\n\r\n
        const events = (partialChunk + chunk).split(/\r\n\r\n|\n\n/);

        // 保存未完成的事件块供下次处理
        partialChunk = events.pop() || '';

        // 处理每个完整的事件块
        for (const eventChunk of events) {
          const data = parser(eventChunk);
          if (data !== null) {
            onData(data); // 触发数据回调
          }
        }
      }

      // 处理剩余数据 (最后一个事件块)
      if (partialChunk) {
        const data = parser(partialChunk);
        if (data !== null) onData(data);
      }

      onComplete?.(); // 触发完成回调
    } catch (error) {
      // 忽略主动中断产生的错误
      if (error instanceof Error && error.name !== 'AbortError') {
        onError?.(error);
      }
    }
  }

  // 发起 fetch 请求
  fetch(options.url, {
    ...options.requestInit, // 用户自定义配置
    signal: controller.signal, // 绑定中止信号
    headers: {
      Accept: 'text/event-stream', // 确保接收 SSE 流
      ...options.requestInit?.headers, // 合并用户自定义 headers
    },
  })
    .then(handleStream)
    .catch((error) => {
      // 处理初始请求错误 (如网络问题)
      if (error instanceof Error && error.name !== 'AbortError') {
        onError?.(error);
      }
    });

  // 返回中止方法供外部调用
  return {
    /** 中止当前请求 */
    abort: () => controller.abort(),
  };
}

/**
 * 默认 SSE 解析器
 * @param eventChunk - 原始事件字符串
 * @returns 解析后的数据对象或 null
 *
 * @example 输入示例
 * "data: Hello\ndata: World\n\n"
 *
 * @example 输出结果
 * "Hello\nWorld" (自动合并多个 data 行)
 */
function defaultParser<T = string>(eventChunk: string): T | null {
  try {
    let content = '';
    // 逐行处理事件内容
    for (const line of eventChunk.split('\n')) {
      // 仅处理 data 字段 (忽略 event/id/retry 等)
      if (line.startsWith('data:')) {
        content += line.slice(5).trim() + '\n'; // 保留换行结构
      }
    }
    content = content.trim(); // 去除首尾空白

    // 空内容返回 null
    if (!content) return null;

    // 尝试解析为 JSON,失败则返回原始字符串
    try {
      return JSON.parse(content) as T;
    } catch {
      return content as unknown as T;
    }
  } catch (e) {
    console.error('SSE parsing error:', e);
    return null;
  }
}

调用方法

let abortController: (() => void) | null = null; // 中止控制器

// 发起SSE请求
const { abort } = fetchStream({
   url: `/ai/chat/memory`,
    requestInit: {
      method: "POST",
      body: formData
    },
    onOpen(data) {
      // 开始
    },
    onData(data) {
      // 有返回数据了
     
    },
    onError(err) {
      console.log("会话失败");
      error.value = `请求失败: ${err.message}`;
      
    },
    onComplete() {
      console.log("会话结束");
      
    },
  });

  abortController = abort; // 保存中止函数
};
// 组件卸载时中止请求
onUnmounted(() => {
  abortController?.();
});

到此这篇关于前端通过fetch模拟调用SSE接口的文章就介绍到这了,更多相关前端模拟调用SSE接口内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文