SpringBoot 2.x 接入非标准SSE格式大模型流式响应的实战解决方案
作者:fengzeng
本文介绍了在SpringBoot2.7.3环境中接入非标准SSE格式大模型流式响应的实战解决方案,通过自定义实现,解决了大模型返回数据格式不符合标准SSE规范的问题,关键步骤包括引入Gradle依赖、配置WebClient、处理粘包、格式兼容和双重过滤机制,感兴趣的朋友跟随小编一起看看吧
近期DeepSeek等国产大模型热度持续攀升,其关注度甚至超过了OpenAI(被戏称为CloseAI)。在SpringBoot3.x
环境中,可以使用官方的Spring AI轻松接入,但对于仍在使用JDK8和SpringBoot2.7.3的企业级应用来说,往往需要自定义实现。特别是当大模型团队返回的数据格式不符合标准SSE规范时,更需要灵活处理。本文将分享我们的实战解决方案。
📦 引入Gradle依赖
核心依赖说明:
spring-boot-starter-web
:基础Web支持spring-boot-starter-webflux
:响应式编程支持(WebClient所在模块)
implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux'
🌐 WebClient配置要点
初始化时特别注意Header配置:
@Bean public WebClient init() { return WebClient.builder() .baseUrl(baseUrl) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi) // ⚠️ 必须设置为JSON格式 .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build(); }
🚨 关键踩坑点:初始设置MediaType.TEXT_EVENT_STREAM_VALUE
会导致请求失败,必须使用APPLICATION_JSON_VALUE
🧠 核心处理逻辑
流式请求入口
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) { // 请求体构建 String requestBody = String.format(""" { "model": "%s", "messages": [{"role": "user", "content": "%s"}], "stream": true } """, model, prompt); return webClient.post() // 请求配置 .uri("/v1/chat/completions") .bodyValue(requestBody) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(DataBuffer.class) // 🔑 关键配置点 .transform(this::processStream) // 重试和超时配置 .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) .timeout(Duration.ofSeconds(180)); // 错误处理 .doOnError(e -> log.error("Stream error", e)) .doFinally(signal -> log.info("Stream completed: {}", signal)); }
技术原理说明
当使用bodyToFlux(DataBuffer.class)
时:
- ✅ 获得原始字节流控制权
- ❌ 避免自动SSE格式解析(适用于非标准响应)
- 📡 动态数据流处理:类似Java Stream,但数据持续追加
🔧 非标准SSE数据处理
核心处理流程
private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) { return dataBufferFlux .transform(DataBufferUtils::join) // 字节流合并 .map(buffer -> { // 字节转字符串 String content = buffer.toString(StandardCharsets.UTF_8); DataBufferUtils.release(buffer); return content; }) .flatMap(content -> // 处理粘包问题 Flux.fromArray(content.split("\\r?\\n\\r?\\n"))) .filter(event -> !event.trim().isEmpty()) // 过滤空事件 .map(event -> { // 格式标准化处理 String trimmed = event.trim(); if (trimmed.startsWith("data:")) { String substring = trimmed.substring(5); return substring.startsWith(" ") ? substring.substring(1) : substring; } return trimmed; }) .filter(event -> !event.startsWith("data:")); // 二次过滤 }
三大关键技术点
粘包处理通过
split("\\r?\\n\\r?\\n")
解决网络传输中的消息边界问题,示例原始数据:data:{response1}\n\ndata:{response2}\n\n
格式兼容处理自动去除服务端可能返回的
data:
前缀,同时保留Spring自动添加SSE前缀的能力双重过滤机制确保最终输出不包含任何残留的SSE格式标识
⚠️ 特别注意
当接口设置produces = MediaType.TEXT_EVENT_STREAM_VALUE
时:
Spring WebFlux会自动添加
data:
前缀前端收到的格式示例:
data: {实际内容}
若手动添加
data:
前缀会导致重复:
data: data: {错误内容} // ❌ 错误格式
🛠️ 完整实现代码
// 包声明和导入... @Service @Slf4j public class OpenAiService { // 配置项和初始化 private String openAiApiKey = "sk-xxxxxx"; private String baseUrl = "https://openai.com/xxxx"; private String model = "gpt-4o"; private WebClient webClient; @PostConstruct public void init() { webClient = WebClient.builder() .baseUrl(baseUrl) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey) .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build(); } @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) { // 构建请求体 String requestBody = String.format(""" { "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "%s"}], "stream": true } """, prompt); // 发送流式请求 return webClient.post() .uri("/v1/chat/completions") .bodyValue(requestBody) .retrieve() .onStatus(HttpStatusCode::isError, response -> response.bodyToMono(String.class) .flatMap(error -> Mono.error(new RuntimeException("API Error: " + error))) ) .bodyToFlux(DataBuffer.class) .transform(this::processStream) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) .timeout(Duration.ofSeconds(180)) .doOnError(e -> log.error("Stream error", e)) .doFinally(signal -> log.info("Stream completed: {}", signal)); } private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) { return dataBufferFlux // 使用字节流处理 .transform(DataBufferUtils::join) .map(buffer -> { String content = buffer.toString(StandardCharsets.UTF_8); DataBufferUtils.release(buffer); return content; }) // 按 SSE 事件边界,防止粘包的问题 .flatMap(content -> Flux.fromArray(content.split("\\r?\\n\\r?\\n"))) // 过滤空事件 .filter(event -> !event.trim().isEmpty()) // 规范 SSE 事件格式 .map(event -> { String trimmed = event.trim(); // 由于webflux设置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE", // 所以在返回数据时会自动添加“data:”,因此如果返回的格式带了“data:”需要手动去除 if (trimmed.startsWith("data:")) { trimmed = trimmed.replaceFirst("data:","").trim(); } return trimmed; }) .filter(event -> !event.startsWith("data:")); } }
到此这篇关于SpringBoot 2.x 接入非标准SSE格式大模型流式响应实践的文章就介绍到这了,更多相关SpringBoot 2.x 接入非标准SSE格式大模型流式响应内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!