java调用第三方接口实现流式输出的示例代码
作者:爱吃土豆的马铃薯ㅤㅤㅤㅤㅤㅤㅤㅤㅤ
本文主要介绍了java调用第三方接口实现流式输出,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
源代码
@Operation(summary = "问答问题查询接口")
@PostMapping(value = "/question/answer")
public String questionAnswer(@Parameter(description = "用户问题", required = true) @RequestParam("message") String message,
@Parameter(description = "Admin-Token", required = true) @RequestHeader( required = false) String token) throws IOException {
return projectService.questionAnswer(message, token);
}
public String questionAnswer(String message,String token){
String chatNormalResult = this.chatNormal(message,chatModel);
return chatNormalResult;
}
public String chatNormal(String message,String modelName){
try {
long startTime = System.nanoTime();
ObjectMapper objectMapper = new ObjectMapper();
// 调用chatNormal做数据分析
String chatNormalUrl = "http://localhost:5670/api/v2/chat/completions";
Map<String, Object> chatNormalBody = new HashMap<>();
// 设置请求头
Map<String, String> headers = new HashMap<>();
headers.put("accept", "application/json");
headers.put("Content-Type", "application/json");
chatNormalBody.put("messages",message);
chatNormalBody.put("model",modelName);
chatNormalBody.put("stream",true);
chatNormalBody.put("temperature",0.5);
chatNormalBody.put("max_new_tokens",4000);
chatNormalBody.put("conv_uid", UUID.randomUUID().toString());
String chatNormalBodyJson = objectMapper.writeValueAsString(chatNormalBody);
// 发送POST请求
long startTime2 = System.nanoTime();
String resultData = okHttpUtil.postJson(chatNormalUrl, chatNormalBodyJson, headers);
return resultData;
}catch (Exception e){
return "暂无数据";
}
}
基于原有代码的最小改动方案
1. 首先,修改pom.xml添加必要依赖
<!-- 如果还没有OkHttp,添加这个依赖 -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.3</version>
</dependency>
2. 修改Controller接口(增加流式接口,原接口不变)
import org.springframework.http.MediaType;
import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
@Operation(summary = "问答问题查询接口(流式)")
@PostMapping(value = "/question/answer/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void questionAnswerStream(
@Parameter(description = "用户问题", required = true) @RequestParam("message") String message,
@Parameter(description = "Admin-Token", required = true) @RequestHeader(required = false) String token,
HttpServletResponse response) throws IOException {
projectService.questionAnswerStream(message, token, response);
}
// 原有的同步接口保持不变
@Operation(summary = "问答问题查询接口")
@PostMapping(value = "/question/answer")
public String questionAnswer(
@Parameter(description = "用户问题", required = true) @RequestParam("message") String message,
@Parameter(description = "Admin-Token", required = true) @RequestHeader(required = false) String token) throws IOException {
return projectService.questionAnswer(message, token);
}
3. 修改Service方法(最简单的方式)
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
public void questionAnswerStream(String message, String token, HttpServletResponse response) throws IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
PrintWriter writer = response.getWriter();
try {
ObjectMapper objectMapper = new ObjectMapper();
String chatNormalUrl = "http://localhost:5670/api/v2/chat/completions";
// 构建请求体
Map<String, Object> requestBody = new HashMap<>();
Map<String, String> userMessage = new HashMap<>();
userMessage.put("role", "user");
userMessage.put("content", message);
requestBody.put("messages", new Map[]{userMessage});
requestBody.put("model", "deepseek-ai/DeepSeek-R1");
requestBody.put("stream", true); // 这里改为true
requestBody.put("temperature", 0.5);
requestBody.put("max_new_tokens", 4000);
requestBody.put("conv_uid", UUID.randomUUID().toString());
String requestBodyJson = objectMapper.writeValueAsString(requestBody);
// 使用OkHttp进行流式调用
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(chatNormalUrl)
.post(RequestBody.create(requestBodyJson, MediaType.parse("application/json")))
.addHeader("accept", "application/json")
.addHeader("Content-Type", "application/json")
.build();
// 关键:使用流式响应
Response apiResponse = client.newCall(request).execute();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(apiResponse.body().byteStream()))) {
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("data: ")) {
// 直接转发给前端
writer.write(line + "\n\n");
writer.flush();
}
}
}
} catch (Exception e) {
// 发送错误信息
writer.write("data: {\"error\": \"" + e.getMessage() + "\"}\n\n");
writer.flush();
e.printStackTrace();
}
}
// 原有的同步方法保持不变
public String questionAnswer(String message, String token) {
try {
// 这里可以调用一个新的方法,或者复用部分逻辑,但stream=false
return callChatAPI(message, false);
} catch (Exception e) {
return "请求失败: " + e.getMessage();
}
}
// 如果想把流式和非流式逻辑统一,可以这样重构
private String callChatAPI(String message, boolean isStream) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
String chatNormalUrl = "http://localhost:5670/api/v2/chat/completions";
Map<String, Object> requestBody = new HashMap<>();
Map<String, String> userMessage = new HashMap<>();
userMessage.put("role", "user");
userMessage.put("content", message);
requestBody.put("messages", new Map[]{userMessage});
requestBody.put("model", "deepseek-ai/DeepSeek-R1");
requestBody.put("stream", isStream); // 根据参数决定是否流式
requestBody.put("temperature", 0.5);
requestBody.put("max_new_tokens", 4000);
requestBody.put("conv_uid", UUID.randomUUID().toString());
String requestBodyJson = objectMapper.writeValueAsString(requestBody);
Map<String, String> headers = new HashMap<>();
headers.put("accept", "application/json");
headers.put("Content-Type", "application/json");
if (isStream) {
// 流式逻辑 - 这里需要不同的处理,但暂时不实现
return "流式调用需使用特定方法";
} else {
// 非流式逻辑
String resultData = okHttpUtil.postJson(chatNormalUrl, requestBodyJson, headers);
// 解析结果
Map<String, Object> result = objectMapper.readValue(resultData, Map.class);
List<Map<String, Object>> choices = (List<Map<String, Object>>) result.get("choices");
if (choices != null && !choices.isEmpty()) {
Map<String, Object> messageObj = (Map<String, Object>) choices.get(0).get("message");
return (String) messageObj.get("content");
}
return "暂无数据";
}
}
4. 如果不想用OkHttp,想保留原有的okHttpUtil,可以用这种方式
public void questionAnswerStream(String message, String token, HttpServletResponse response) throws IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
PrintWriter writer = response.getWriter();
try {
// 使用Socket直接连接,这是最直接的流式处理方式
Socket socket = new Socket("localhost", 5670);
// 构建HTTP请求
String requestBody = buildRequestBody(message);
String httpRequest =
"POST /api/v2/chat/completions HTTP/1.1\r\n" +
"Host: localhost:5670\r\n" +
"Accept: application/json\r\n" +
"Content-Type: application/json\r\n" +
"Content-Length: " + requestBody.length() + "\r\n" +
"Connection: close\r\n" +
"\r\n" +
requestBody;
// 发送请求
OutputStream out = socket.getOutputStream();
out.write(httpRequest.getBytes(StandardCharsets.UTF_8));
out.flush();
// 读取响应
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
// 跳过HTTP响应头
String line;
while ((line = reader.readLine()) != null && !line.isEmpty()) {
// 跳过头部
}
// 读取SSE数据
while ((line = reader.readLine()) != null) {
if (line.startsWith("data: ")) {
writer.write(line + "\n\n");
writer.flush();
if (line.equals("data: [DONE]")) {
break;
}
}
}
socket.close();
} catch (Exception e) {
writer.write("data: {\"error\": \"" + e.getMessage() + "\"}\n\n");
writer.flush();
e.printStackTrace();
}
}
private String buildRequestBody(String message) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> requestBody = new HashMap<>();
Map<String, String> userMessage = new HashMap<>();
userMessage.put("role", "user");
userMessage.put("content", message);
requestBody.put("messages", new Map[]{userMessage});
requestBody.put("model", "deepseek-ai/DeepSeek-R1");
requestBody.put("stream", true);
requestBody.put("temperature", 0.5);
requestBody.put("max_new_tokens", 4000);
requestBody.put("conv_uid", UUID.randomUUID().toString());
return objectMapper.writeValueAsString(requestBody);
}
5. 最简单的测试版本(先验证能否流式)
public void questionAnswerStream(String message, String token, HttpServletResponse response) throws IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
PrintWriter writer = response.getWriter();
// 先模拟流式输出,验证前端能收到
writer.write("data: {\"choices\":[{\"delta\":{\"content\":\"正在处理您的问题...\\n\"}}]}\n\n");
writer.flush();
Thread.sleep(1000);
writer.write("data: {\"choices\":[{\"delta\":{\"content\":\"这是流式输出的第一段内容。\\n\"}}]}\n\n");
writer.flush();
Thread.sleep(1000);
writer.write("data: {\"choices\":[{\"delta\":{\"content\":\"这是第二段内容。\\n\"}}]}\n\n");
writer.flush();
Thread.sleep(1000);
writer.write("data: [DONE]\n\n");
writer.flush();
}
6. 前端调用(最简单的HTML页面)
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>测试流式输出</title>
</head>
<body>
<h2>测试流式问答</h2>
<input type="text" id="question" placeholder="输入问题">
<button onclick="askQuestion()">提问</button>
<div id="answer" style="border:1px solid #ccc; padding:10px; margin-top:10px; min-height:100px;"></div>
<script>
function askQuestion() {
const question = document.getElementById('question').value;
const answerDiv = document.getElementById('answer');
answerDiv.innerHTML = '';
// 使用EventSource接收流式数据
const eventSource = new EventSource(`/api/question/answer/stream?message=${encodeURIComponent(question)}`);
eventSource.onmessage = function(event) {
const data = event.data;
if (data === '[DONE]') {
eventSource.close();
return;
}
try {
const json = JSON.parse(data);
if (json.choices && json.choices[0].delta && json.choices[0].delta.content) {
answerDiv.innerHTML += json.choices[0].delta.content;
}
} catch (e) {
console.error('解析错误:', e);
}
};
eventSource.onerror = function(error) {
console.error('连接错误:', error);
eventSource.close();
};
}
</script>
</body>
</html>
关键改动总结:
- 新增一个流式接口:使用
produces = MediaType.TEXT_EVENT_STREAM_VALUE - 修改HttpServletResponse:设置SSE相关响应头
- 保持原有接口不变:不影响现有调用
- 使用PrintWriter流式输出:最简单的流式输出方式
- 前端使用EventSource接收:最简洁的前端实现
如果希望更简单地集成,这里是最简单的方案:
@PostMapping(value = "/question/answer/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public void questionAnswerStream(
@RequestParam("message") String message,
@RequestHeader(required = false) String token,
HttpServletResponse response) throws IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
PrintWriter writer = response.getWriter();
// 直接调用外部命令(如果API支持命令行调用)
try {
// 构建curl命令调用DeepSeek API
String[] cmd = {
"curl", "-X", "POST",
"http://localhost:5670/api/v2/chat/completions",
"-H", "Content-Type: application/json",
"-H", "accept: application/json",
"-d", String.format("{\"messages\":[{\"role\":\"user\",\"content\":\"%s\"}],\"model\":\"deepseek-ai/DeepSeek-R1\",\"stream\":true}", message)
};
Process process = Runtime.getRuntime().exec(cmd);
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
while ((line = reader.readLine()) != null) {
writer.write(line + "\n\n");
writer.flush();
}
process.waitFor();
} catch (Exception e) {
writer.write("data: {\"error\": \"" + e.getMessage() + "\"}\n\n");
writer.flush();
}
}
只需要新增一个接口,就可以实现最直接的流式输出方式
集成到现有OkHttpUtil工具类的流式输出方案
作为资深专家,我会充分复用现有的工具类。以下是基于现有代码的最小改动方案:
1. 首先,在OkHttpUtil中添加流式处理方法
@Slf4j
@Component
public class OkHttpUtil {
private static final OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.MINUTES)
.writeTimeout(30, TimeUnit.MINUTES)
.callTimeout(90, TimeUnit.MINUTES)
.build();
/**
* 流式POST请求 - 专为SSE设计
*/
public void postJsonStream(String url, String json, Map<String, String> headers,
Consumer<String> lineConsumer) throws IOException {
Request.Builder builder = new Request.Builder()
.url(url)
.post(RequestBody.create(json, okhttp3.MediaType.parse("application/json")));
if (headers != null) {
headers.forEach(builder::addHeader);
}
Request request = builder.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("请求失败: " + response.code());
}
ResponseBody body = response.body();
if (body == null) {
return;
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(body.byteStream()))) {
String line;
while ((line = reader.readLine()) != null) {
lineConsumer.accept(line);
}
}
}
}
// 原有的同步方法保持不变
public String postJson(String url, String json, Map<String, String> headers) throws IOException {
// ... 原有的实现 ...
return null;
}
}
2. 修改Service方法(最小改动)
import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
public void questionAnswerStream(String message, String token, HttpServletResponse response) throws IOException {
// 设置SSE响应头
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
PrintWriter writer = response.getWriter();
try {
// 复用原有的请求构建逻辑
ObjectMapper objectMapper = new ObjectMapper();
// 构建请求体 - 和原有代码一模一样,只是stream=true
Map<String, Object> chatNormalBody = new HashMap<>();
// 构建messages - 注意这里需要是数组
List<Map<String, String>> messages = new ArrayList<>();
Map<String, String> userMessage = new HashMap<>();
userMessage.put("role", "user");
userMessage.put("content", message);
messages.add(userMessage);
chatNormalBody.put("messages", messages);
chatNormalBody.put("model", "deepseek-ai/DeepSeek-R1");
chatNormalBody.put("stream", true); // 关键:改为true
chatNormalBody.put("temperature", 0.5);
chatNormalBody.put("max_new_tokens", 4000);
chatNormalBody.put("conv_uid", UUID.randomUUID().toString());
String chatNormalBodyJson = objectMapper.writeValueAsString(chatNormalBody);
// 构建请求头
Map<String, String> headers = new HashMap<>();
headers.put("accept", "application/json");
headers.put("Content-Type", "application/json");
// 关键:使用新的流式方法调用
okHttpUtil.postJsonStream(
"http://localhost:5670/api/v2/chat/completions",
chatNormalBodyJson,
headers,
line -> {
// 处理每一行流式数据
if (line.startsWith("data: ")) {
try {
writer.write(line + "\n\n");
writer.flush();
// 如果收到结束标志,可以在这里做额外处理
if (line.equals("data: [DONE]")) {
log.info("流式输出完成");
}
} catch (Exception e) {
log.error("写入响应失败", e);
}
}
}
);
} catch (Exception e) {
log.error("流式请求失败", e);
// 发送错误信息给前端
try {
Map<String, String> error = new HashMap<>();
error.put("error", e.getMessage());
String errorJson = objectMapper.writeValueAsString(error);
writer.write("data: " + errorJson + "\n\n");
writer.flush();
} catch (Exception ex) {
// 忽略
}
}
}
// 原有的同步方法完全保持不变
public String questionAnswer(String message, String token) {
String chatNormalResult = this.chatNormal(message, "deepseek-ai/DeepSeek-R1");
return chatNormalResult;
}
// 原有的chatNormal方法完全保持不变
public String chatNormal(String message, String modelName) {
try {
long startTime = System.nanoTime();
ObjectMapper objectMapper = new ObjectMapper();
String chatNormalUrl = "http://localhost:5670/api/v2/chat/completions";
Map<String, Object> chatNormalBody = new HashMap<>();
Map<String, String> headers = new HashMap<>();
headers.put("accept", "application/json");
headers.put("Content-Type", "application/json");
// 注意:这里messages应该是数组,看原有代码怎么写的
// 假设原有代码已经处理好了
chatNormalBody.put("messages", message); // 这里可能需要调整
chatNormalBody.put("model", modelName);
chatNormalBody.put("stream", false); // 同步请求不流式
chatNormalBody.put("temperature", 0.5);
chatNormalBody.put("max_new_tokens", 4000);
chatNormalBody.put("conv_uid", UUID.randomUUID().toString());
String chatNormalBodyJson = objectMapper.writeValueAsString(chatNormalBody);
String resultData = okHttpUtil.postJson(chatNormalUrl, chatNormalBodyJson, headers);
return resultData;
} catch (Exception e) {
return "暂无数据";
}
}
3. 或者,更简单的方式 - 直接修改chatNormal方法支持流式
如果想让代码更统一,可以这样修改:
// 在Service类中添加这个方法
public void chatNormalStream(String message, String modelName, PrintWriter writer) {
try {
ObjectMapper objectMapper = new ObjectMapper();
// 构建messages
List<Map<String, String>> messages = new ArrayList<>();
Map<String, String> userMessage = new HashMap<>();
userMessage.put("role", "user");
userMessage.put("content", message);
messages.add(userMessage);
// 构建请求体
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("messages", messages);
requestBody.put("model", modelName);
requestBody.put("stream", true);
requestBody.put("temperature", 0.5);
requestBody.put("max_new_tokens", 4000);
requestBody.put("conv_uid", UUID.randomUUID().toString());
String jsonBody = objectMapper.writeValueAsString(requestBody);
// 请求头
Map<String, String> headers = new HashMap<>();
headers.put("accept", "application/json");
headers.put("Content-Type", "application/json");
// 使用OkHttp直接调用(不通过工具类,因为需要流式处理)
OkHttpClient client = new OkHttpClient();
okhttp3.Request request = new okhttp3.Request.Builder()
.url("http://localhost:5670/api/v2/chat/completions")
.post(okhttp3.RequestBody.create(
jsonBody,
okhttp3.MediaType.parse("application/json")
))
.headers(okhttp3.Headers.of(headers))
.build();
try (okhttp3.Response response = client.newCall(request).execute()) {
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
BufferedReader reader = new BufferedReader(
new InputStreamReader(body.byteStream())
);
String line;
while ((line = reader.readLine()) != null) {
// 直接转发SSE格式数据
if (line.startsWith("data: ")) {
writer.write(line + "\n\n");
writer.flush();
}
}
}
}
}
} catch (Exception e) {
log.error("流式请求失败", e);
try {
writer.write("data: {\"error\": \"" + e.getMessage() + "\"}\n\n");
writer.flush();
} catch (Exception ex) {
// 忽略
}
}
}
4. 最简化的Controller调用
@Operation(summary = "问答问题查询接口 - 流式")
@PostMapping(value = "/question/answer/stream", produces = "text/event-stream;charset=UTF-8")
public void questionAnswerStream(
@RequestParam("message") String message,
@RequestHeader(required = false) String token,
HttpServletResponse response) throws IOException {
// 设置SSE响应头
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
// 直接调用
projectService.chatNormalStream(message, "deepseek-ai/DeepSeek-R1", response.getWriter());
}
5. 如果不想修改OkHttpUtil,最简单的版本
// 直接在Service中处理流式请求
public void questionAnswerStream(String message, String token, HttpServletResponse response) throws IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
PrintWriter writer = response.getWriter();
try {
// 复用原有的OkHttpClient配置
OkHttpClient client = OkHttpUtil.getClient(); // 如果client是public或提供getter
// 构建请求体(和原有代码一致)
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> requestBody = new HashMap<>();
// 构建messages数组
List<Map<String, String>> messages = new ArrayList<>();
Map<String, String> userMsg = new HashMap<>();
userMsg.put("role", "user");
userMsg.put("content", message);
messages.add(userMsg);
requestBody.put("messages", messages);
requestBody.put("model", "deepseek-ai/DeepSeek-R1");
requestBody.put("stream", true);
requestBody.put("temperature", 0.5);
requestBody.put("max_new_tokens", 4000);
requestBody.put("conv_uid", UUID.randomUUID().toString());
String json = mapper.writeValueAsString(requestBody);
// 构建请求
okhttp3.Request request = new okhttp3.Request.Builder()
.url("http://localhost:5670/api/v2/chat/completions")
.post(okhttp3.RequestBody.create(json,
okhttp3.MediaType.parse("application/json; charset=utf-8")))
.addHeader("accept", "application/json")
.addHeader("Content-Type", "application/json")
.build();
// 执行请求并处理流式响应
try (okhttp3.Response apiResponse = client.newCall(request).execute()) {
if (apiResponse.isSuccessful()) {
ResponseBody body = apiResponse.body();
if (body != null) {
// 逐行读取流式响应
BufferedReader reader = new BufferedReader(
new InputStreamReader(body.byteStream())
);
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("data: ")) {
// 直接转发给前端
writer.write(line + "\n\n");
writer.flush();
}
}
}
}
}
} catch (Exception e) {
log.error("流式请求异常", e);
writer.write("data: {\"error\": \"服务异常: " + e.getMessage() + "\"}\n\n");
writer.flush();
}
}
6. 前端调用(极简版)
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>流式问答</title>
</head>
<body>
<input type="text" id="question" placeholder="输入问题">
<button onclick="ask()">提问</button>
<div id="answer" style="white-space: pre-wrap; border:1px solid #ccc; padding:10px; margin-top:10px;"></div>
<script>
function ask() {
const question = document.getElementById('question').value;
const answerDiv = document.getElementById('answer');
answerDiv.innerHTML = '';
// 使用EventSource接收流式数据
const es = new EventSource(`/api/question/answer/stream?message=${encodeURIComponent(question)}`);
es.onmessage = function(e) {
const data = e.data;
if (data === '[DONE]') {
es.close();
return;
}
try {
const json = JSON.parse(data);
if (json.choices && json.choices[0].delta && json.choices[0].delta.content) {
answerDiv.innerHTML += json.choices[0].delta.content;
}
} catch(err) {
// 忽略解析错误
}
};
es.onerror = function() {
es.close();
};
}
</script>
</body>
</html>
到此这篇关于java调用第三方接口实现流式输出的示例代码的文章就介绍到这了,更多相关java 流式输出内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
