Spring WebFlux 流式数据拉取与推送的实现
作者:on the way 123
本文介绍了使用Spring WebFlux实现流式数据拉取与推送的方案。文章首先展示了流式返回数据的格式(类似DeepSeek大模型的推送模式),然后详细讲解了三个核心实现部分:1)通过Flux.create实现流式响应数据的桥接转发;2)配置OkHttpClient的HTTP客户端参数(特别是readTimeout和callTimeout设为0以支持流式传输);3)核心数据获取方法queryDifficultFaultMessage的实现,包括异步请求处理、错误处理和取消订阅机制。该方案实现了后端对原始数
前言
1,流式返回数据类型如下,是不断的推送数据,类似于主流DeepSeek大模型模式,推送数据一点点推送,直至推送结束或者主动点击停止
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":""} data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":""} data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"故障"} data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"根"} data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"因"} .... data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":":\n\n"} data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"message_end","answer":"一"}```
2,流式接口又称基于响应式编程(Reactive Programming)的服务器端到服务器端(Service-to-Service)的流式数据拉取与推送的实现。
3,它的核心作用是:从一个流式API(SSE)消费数据,并立即将其作为流式响应转发给客户端
4,功能要求,后段不做数据处理,大模型接口返回什么数据,直接回传给前端,分阻塞返回,【当时做的效果是:大模型返回数据,后端自动拼接结束后返回给前端,这种效果很不友好,导致请求的时间比较长】
功能实现
1.流式响应数据
public Flux<DifficultFaultMessageVo> streamingQueryDifficultFault(DifficultFaultMessageDto paramDto) { // 这个 Flux 代表了从下游服务获取的原始数据流。 Flux<DifficultFaultMessageVo> faults = queryDifficultFaultMessage(paramDto); return Flux.create(sink -> { // faults.subscribeOn(Schedulers.boundedElastic()): 这行代码至关重要。它告诉上游的 faults 流在 boundedElastic 调度器上执行其订阅操作(即执行网络请求和处理响应)。 faults.subscribeOn(Schedulers.boundedElastic()) // 这里创建了一个新的 Flux。create 方法允许我们手动控制如何向这个流中发射数据。我们传入一个 Consumer,它接收一个 FluxSink 对象(这里的参数名为 sink)作为参数。Sink(汇)就是数据流的出口,我们可以通过它发射数据 (next)、错误 (error) 或完成信号 (complete)。 .subscribe(sink::next, sink::error, sink::complete); }); }
代码解析:
- subscribe(sink::next, sink::error, sink::complete): 这里订阅了从 queryDifficultFaults 返回的原始流。
- 当原始流 (faults) 产生一个数据 (DifficultFaultMessageVo 对象) 时,就通过 sink::next 将它转发给我们新创建的流的 Sink。
- 当原始流发生错误时,通过 sink::error 将错误转发给新流的 Sink。
- 当原始流结束时,通过 sink::complete 结束新流的 Sink。
总结:这个方法的作用可以理解为 “流的桥接” 。它将在一个弹性线程上执行的、可能阻塞的原始数据流,桥接成一个适合在WebFlux等响应式Web框架中返回的响应式流。外部调用者(如Controller)只需返回这个方法的返回值,框架就会自动处理流的订阅和HTTP响应体的流式写入。
2.HTTP客户端配置 okhttpclient
private final OkHttpClient httpClient = new OkHttpClient.Builder() .connectTimeout(120, TimeUnit.SECONDS) // 连接超时2分钟 .readTimeout(0, TimeUnit.SECONDS) // 读取超时:0(无限等待,对于流式响应关键!) .writeTimeout(120, TimeUnit.SECONDS) // 写入超时2分钟 .callTimeout(0, TimeUnit.SECONDS) // 整个调用超时:0(无限等待) .retryOnConnectionFailure(true) // 自动重试连接失败 .connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接池(20个空闲连接,存活5分钟) .build();
代码解读:
这个配置是流式HTTP客户端的灵魂,每一项都针对长连接和流式传输进行了优化:
- readTimeout(0): 这是最关键的配置。普通的HTTP请求需要设置读取超时,但流式响应是一个长时间存在的连接,数据会分块持续发送。设置为 0 表示永不超时,客户端会一直等待服务器发送更多数据,直到连接被服务器或自己主动关闭。
- callTimeout(0): 同理,整个调用的总时间也不应设限。
- connectTimeout 和 writeTimeout: 这两个仍然需要设置一个合理的值,分别控制建立TCP连接的时间和发送请求体的时间,这些操作不应该无限等待。
- connectionPool: 使用连接池可以复用TCP连接,避免为每个请求都进行三次握手,极大提升性能。这里配置了最多保持20个空闲连接,每个空闲连接最多存活5分钟。
- retryOnConnectionFailure: 网络抖动时自动重试,提高鲁棒性。
3.核心数据获取方法
private Flux<DifficultFaultMessageVo> queryDifficultFaultMessage(DifficultFaultMessageDto paramDto) { return Flux.create(emitter -> { // 这个emitter是内部Flux的Sink Request request = buildRequest(paramDto); Call call = httpClient.newCall(request); call.enqueue(new Callback() { // 异步执行HTTP请求 @Override public void onFailure(...) { if (!emitter.isCancelled()) { emitter.error(...); // 网络失败,向上游发射错误 } } @Override public void onResponse(...) throws IOException { if (!response.isSuccessful()) { if (!emitter.isCancelled()) { emitter.error(...); // HTTP状态码非2xx,向上游发射错误 } return; } // 成功响应,开始处理流式响应体 processResponseStream(response, emitter); } }); // 重要:注册取消回调 emitter.onCancel(() -> { if (!call.isCanceled()) { call.cancel(); // 如果下游取消订阅(如客户端断开),则取消OkHttp请求 } }); }); }
构建请求
private Request buildRequest(DifficultFaultMessageDto paramDto) { // 2.组装请求头信息 MediaType parse = MediaType.parse("application/json;charset=UTF-8"); // 3.组装请求体信息 JSONObject requestBody = new JSONObject(); requestBody.put("x x x", paramDto.getxxxType()); requestBody.put("apiKey", paramDto.getApiKey()); // 省略业务代码 ... log.info("API请求体: {}", requestBody); return new Request.Builder().url("http://xxx.x.xx.xx:8000/servicexxx/r/postApi").post(body) .addHeader("X-APP-ID", "xx09xx3xx0d7").addHeader("X-APP-KEY", "9jfksjfjkxxkkssdc") .addHeader("Content-Type", "application/json").build(); }
代码解读:
- 目的:创建一个 Flux,用于封装对下游服务的异步HTTP调用和流式响应处理。
执行流程:
- 构建请求 (buildRequest) 和调用对象 (Call)。
- 异步执行 (call.enqueue) HTTP请求。
在回调中:
- 失败 (onFailure): 检查内部的 FluxSink (emitter) 是否还未被取消(即下游是否还在关心结果),如果是,则发射一个错误信号。
- 成功 (onResponse): 检查HTTP状态码,如果不成功则发射错误;如果成功,则调用 processResponseStream 开始处理响应体流。
- emitter.onCancel(…): 这是响应式编程中资源清理的关键。它注册了一个回调,当这个 Flux 的下游订阅者取消订阅时(例如,前端用户关闭了浏览器标签页),这个回调会被触发。回调里会取消底层的OkHttp Call 对象,从而立即关闭网络连接,避免资源泄漏。这是一种“背压”(Backpressure)传播,体现了响应式的优点。
4.流式响应体处理 【最核心部分】
private void processResponseStream(Response response, FluxSink<DifficultFaultsVo> emitter) { try (ResponseBody responseBody = response.body()) { // 使用try-with-resources确保资源关闭 ... BufferedSource source = responseBody.source(); // 获取缓冲数据源 AtomicBoolean isComplete = new AtomicBoolean(false); // 标志位,是否收到结束事件 try { while (!emitter.isCancelled()) { // 循环,只要下游没有取消就继续读 String line = source.readUtf8Line(); // 读取一行UTF-8文本 if (line == null) break; // 读到null表示流自然结束(服务器关闭连接) if (StringUtils.isBlank(line)) continue; // 忽略空行 // SSE协议格式:每段数据以"data: "开头 if (line.startsWith("data:")) { String jsonData = line.substring(6); // 截取"data: "后面的JSON字符串 // 过滤:只处理包含"answer"或"message_end"的数据行 if (!jsonData.contains("answer") && !jsonData.contains("message_end")) { continue; } DifficultFaultMessageVo vo = processModelResponse(jsonData); // 解析JSON为值对象 if (vo != null) { emitter.next(vo); // 解析成功,立即发射给下游(最终到前端) // 如果遇到结束事件,标记完成并结束循环 if ("message_end".equals(vo.getEvent())) { isComplete.set(true); emitter.complete(); break; } } } } } catch (Exception e) { // 处理读取和解析过程中的异常 if (!emitter.isCancelled()){ emitter.error(e); } } finally { // 确保流最终完成 if (!isComplete.get() && !emitter.isCancelled()){ emitter.complete(); } } ... } catch (Exception e) { ... } finally { response.close(); // 最终确保HTTP响应被关闭 } } private DifficultFaultMessageVo processModelResponse(String jsonData) { try { // 1. 解析JSON DifficultFaultMessageVo rawResponse = JSONUtil.toBean( jsonData, DifficultFaultMessageVo.class, false ); // 2. 过滤非消息事件 String event = rawResponse.getEvent(); // 3. 创建前端响应对象 DifficultFaultMessageVo result = new DifficultFaultMessageVo(); result.setConversationId(rawResponse.getConversationId()); // 4. 处理结束事件 if ("message_end".equals(event)) { result.setEvent("message_end"); result.setAnswer(""); return result; } result.setAnswer(rawResponse.getAnswer()); result.setEvent(rawResponse.getEvent()); return result; } catch (Exception e) { log.error("JSON解析错误: {} - {}", jsonData, e.getMessage()); return null; } }
代码解读:
- 核心任务:从 ResponseBody 的流中逐行读取并解析SSE格式。
- SSE格式简介:通常为 data: {json}\n\n。代码只关心以 data: 开头的行。
关键点:
- 逐行读取: source.readUtf8Line() 是阻塞方法,这就是为什么必须在 boundedElastic 线程上执行的原因。
- 过滤: 并非所有 data: 行都需要处理,这里通过检查内容来过滤。
- JSON解析: processModelResponse(jsonData) 方法(代码未给出)负责将JSON字符串解析为 DifficultFaultsVo 对象。
- 实时发射: 一旦解析成功,立即通过 emitter.next(vo) 将数据推送给下游,实现了数据块的零延迟转发。
结束条件:
- 显式结束: 收到 “message_end” 事件,调用 emitter.complete()。
- 隐式结束: 服务器关闭连接(readUtf8Line() 返回 null),跳出循环。
- 异常结束: 捕获到任何异常,调用 emitter.error(e)。
- 健壮性保证: 大量的 if (!emitter.isCancelled()) 检查确保了在下游已经不感兴趣的情况下,不会进行无效的操作(发射数据、错误或完成信号)。finally 块确保了在任何情况下流最终都会被关闭,防止资源泄漏。
总结:
这段代码实现了一个高效、健壮的双重流式处理管道:
- 下游流 (OkHttp -> 本服务):使用配置了长超时的OkHttp客户端,异步调用外部流式API,并在独立的弹性线程上阻塞地、逐行读取SSE响应。
- 上游流 (本服务 -> 客户端):通过Project Reactor的 Flux 和 Sink,将下游获取到的数据块立即、实时地转发给最终的客户端(如Web浏览器)。
关键特性:
- 非阻塞IO: 通过将阻塞操作卸载到专用线程池,保护了Web容器的核心线程。
- 背压传播: 下游的取消订阅会向上传播,最终取消OkHttp请求,及时释放资源。
- 全面错误处理: 对网络错误、HTTP错误、解析错误、连接意外关闭等都有处理。
- 资源安全: 广泛使用 try-with-resources 和 finally 块确保网络连接和响应体被正确关闭。
这是一种在Spring WebFlux等响应式框架中集成传统阻塞式HTTP客户端以消费流式服务的标准且优雅的模式。
到此这篇关于Spring WebFlux 流式数据拉取与推送的实现的文章就介绍到这了,更多相关Spring WebFlux 流式拉取与推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!