java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring WebFlux 流式拉取与推送

Spring WebFlux 流式数据拉取与推送的实现

作者:on the way 123

本文介绍了使用Spring WebFlux实现流式数据拉取与推送的方案,通过配置长超时、连接池和重试机制优化性能,实现了阻塞与非阻塞的结合,感兴趣的可以了解一下

本文介绍了使用Spring WebFlux实现流式数据拉取与推送的方案。文章首先展示了流式返回数据的格式(类似DeepSeek大模型的推送模式),然后详细讲解了三个核心实现部分:1)通过Flux.create实现流式响应数据的桥接转发;2)配置OkHttpClient的HTTP客户端参数(特别是readTimeout和callTimeout设为0以支持流式传输);3)核心数据获取方法queryDifficultFaultMessage的实现,包括异步请求处理、错误处理和取消订阅机制。该方案实现了后端对原始数

前言

1,流式返回数据类型如下,是不断的推送数据,类似于主流DeepSeek大模型模式,推送数据一点点推送,直至推送结束或者主动点击停止

data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":""}

data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":""}

data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"故障"}

data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"根"}

data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"因"}
....
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":":\n\n"}

data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"message_end","answer":"一"}```

2,流式接口又称基于响应式编程(Reactive Programming)的服务器端到服务器端(Service-to-Service)的流式数据拉取与推送的实现。

3,它的核心作用是:从一个流式API(SSE)消费数据,并立即将其作为流式响应转发给客户端

4,功能要求,后段不做数据处理,大模型接口返回什么数据,直接回传给前端,分阻塞返回,【当时做的效果是:大模型返回数据,后端自动拼接结束后返回给前端,这种效果很不友好,导致请求的时间比较长】

功能实现

1.流式响应数据

public Flux<DifficultFaultMessageVo> streamingQueryDifficultFault(DifficultFaultMessageDto paramDto) {
    // 这个 Flux 代表了从下游服务获取的原始数据流。
    Flux<DifficultFaultMessageVo> faults = queryDifficultFaultMessage(paramDto);
    return Flux.create(sink -> {
        // faults.subscribeOn(Schedulers.boundedElastic()): 这行代码至关重要。它告诉上游的 faults 流在 boundedElastic 调度器上执行其订阅操作(即执行网络请求和处理响应)。
        faults.subscribeOn(Schedulers.boundedElastic())
              // 这里创建了一个新的 Flux。create 方法允许我们手动控制如何向这个流中发射数据。我们传入一个 Consumer,它接收一个 FluxSink 对象(这里的参数名为 sink)作为参数。Sink(汇)就是数据流的出口,我们可以通过它发射数据 (next)、错误 (error) 或完成信号 (complete)。
                .subscribe(sink::next, sink::error, sink::complete);
    });
}

代码解析:

总结:这个方法的作用可以理解为 “流的桥接” 。它将在一个弹性线程上执行的、可能阻塞的原始数据流,桥接成一个适合在WebFlux等响应式Web框架中返回的响应式流。外部调用者(如Controller)只需返回这个方法的返回值,框架就会自动处理流的订阅和HTTP响应体的流式写入。

2.HTTP客户端配置 okhttpclient

private final OkHttpClient httpClient = new OkHttpClient.Builder()
        .connectTimeout(120, TimeUnit.SECONDS) // 连接超时2分钟
        .readTimeout(0, TimeUnit.SECONDS)      // 读取超时:0(无限等待,对于流式响应关键!)
        .writeTimeout(120, TimeUnit.SECONDS)   // 写入超时2分钟
        .callTimeout(0, TimeUnit.SECONDS)      // 整个调用超时:0(无限等待)
        .retryOnConnectionFailure(true)        // 自动重试连接失败
        .connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 连接池(20个空闲连接,存活5分钟)
        .build();

代码解读:

这个配置是流式HTTP客户端的灵魂,每一项都针对长连接和流式传输进行了优化:

3.核心数据获取方法

private Flux<DifficultFaultMessageVo> queryDifficultFaultMessage(DifficultFaultMessageDto paramDto) {
    return Flux.create(emitter -> { // 这个emitter是内部Flux的Sink
        Request request = buildRequest(paramDto);
        Call call = httpClient.newCall(request);
        
        call.enqueue(new Callback() { // 异步执行HTTP请求
            @Override
            public void onFailure(...) {
                if (!emitter.isCancelled()) {
                    emitter.error(...); // 网络失败,向上游发射错误
                }
            }

            @Override
            public void onResponse(...) throws IOException {
                if (!response.isSuccessful()) {
                    if (!emitter.isCancelled()) {
                        emitter.error(...); // HTTP状态码非2xx,向上游发射错误
                    }
                    return;
                }
                // 成功响应,开始处理流式响应体
                processResponseStream(response, emitter);
            }
        });

        // 重要:注册取消回调
        emitter.onCancel(() -> {
            if (!call.isCanceled()) {
                call.cancel(); // 如果下游取消订阅(如客户端断开),则取消OkHttp请求
            }
        });
    });
}

构建请求

    private Request buildRequest(DifficultFaultMessageDto paramDto) {
        // 2.组装请求头信息
        MediaType parse = MediaType.parse("application/json;charset=UTF-8");
        // 3.组装请求体信息
        JSONObject requestBody = new JSONObject();
        requestBody.put("x x x", paramDto.getxxxType());
        requestBody.put("apiKey", paramDto.getApiKey());

		// 省略业务代码
        ... 
        log.info("API请求体: {}", requestBody);
        return new Request.Builder().url("http://xxx.x.xx.xx:8000/servicexxx/r/postApi").post(body)
                .addHeader("X-APP-ID", "xx09xx3xx0d7").addHeader("X-APP-KEY", "9jfksjfjkxxkkssdc")
                .addHeader("Content-Type", "application/json").build();
    }

代码解读:

执行流程:

在回调中:

4.流式响应体处理 【最核心部分】

private void processResponseStream(Response response, FluxSink<DifficultFaultsVo> emitter) {
    try (ResponseBody responseBody = response.body()) { // 使用try-with-resources确保资源关闭
        ...
        BufferedSource source = responseBody.source(); // 获取缓冲数据源
        AtomicBoolean isComplete = new AtomicBoolean(false); // 标志位,是否收到结束事件

        try {
            while (!emitter.isCancelled()) { // 循环,只要下游没有取消就继续读
                String line = source.readUtf8Line(); // 读取一行UTF-8文本
                if (line == null) break; // 读到null表示流自然结束(服务器关闭连接)
                if (StringUtils.isBlank(line)) continue; // 忽略空行

                // SSE协议格式:每段数据以"data: "开头
                if (line.startsWith("data:")) {
                    String jsonData = line.substring(6); // 截取"data: "后面的JSON字符串
                    // 过滤:只处理包含"answer"或"message_end"的数据行
                    if (!jsonData.contains("answer") && !jsonData.contains("message_end")) {
                        continue;
                    }

                    DifficultFaultMessageVo vo = processModelResponse(jsonData); // 解析JSON为值对象
                    if (vo != null) {
                        emitter.next(vo); // 解析成功,立即发射给下游(最终到前端)
                        // 如果遇到结束事件,标记完成并结束循环
                        if ("message_end".equals(vo.getEvent())) {
                            isComplete.set(true);
                            emitter.complete();
                            break;
                        }
                    }
                }
            }
        } catch (Exception e) {
            // 处理读取和解析过程中的异常
            if (!emitter.isCancelled()){
                emitter.error(e);
            }
        } finally {
            // 确保流最终完成
            if (!isComplete.get() && !emitter.isCancelled()){
                emitter.complete();
            }
        }
        ...
    } catch (Exception e) {
        ...
    } finally {
        response.close(); // 最终确保HTTP响应被关闭
    }
}


private DifficultFaultMessageVo processModelResponse(String jsonData) {
        try {
            // 1. 解析JSON
            DifficultFaultMessageVo rawResponse = JSONUtil.toBean(
                    jsonData,
                    DifficultFaultMessageVo.class,
                    false
            );

            // 2. 过滤非消息事件
            String event = rawResponse.getEvent();

            // 3. 创建前端响应对象
            DifficultFaultMessageVo result = new DifficultFaultMessageVo();
            result.setConversationId(rawResponse.getConversationId());

            // 4. 处理结束事件
            if ("message_end".equals(event)) {
                result.setEvent("message_end");
                result.setAnswer("");
                return result;
            }
            result.setAnswer(rawResponse.getAnswer());
            result.setEvent(rawResponse.getEvent());

            return result;

        } catch (Exception e) {
            log.error("JSON解析错误: {} - {}", jsonData, e.getMessage());
            return null;
        }
    }

代码解读:

关键点:

结束条件:

总结:

这段代码实现了一个高效、健壮的双重流式处理管道:

关键特性:

这是一种在Spring WebFlux等响应式框架中集成传统阻塞式HTTP客户端以消费流式服务的标准且优雅的模式。

到此这篇关于Spring WebFlux 流式数据拉取与推送的实现的文章就介绍到这了,更多相关Spring WebFlux 流式拉取与推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文