springboot结合vue实现sse接口详细流程
作者:IT界Tony哥
这篇文章主要为大家详细介绍了springboot结合vue实现sse接口详细流程,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以了解下
1. Spring Boot 后端实现
1.1 添加依赖
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
1.2 SSE控制器
// SseController.java
@RestController
@RequestMapping("/api/sse")
@CrossOrigin(origins = "*") // 允许跨域
public class SseController {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
/**
* 建立SSE连接
*/
@GetMapping("/connect/{clientId}")
public SseEmitter connect(@PathVariable String clientId) {
// 设置超时时间(0表示永不超时)
SseEmitter emitter = new SseEmitter(0L);
// 注册到Map中
emitters.put(clientId, emitter);
// 连接成功回调
emitter.onCompletion(() -> {
System.out.println("SSE连接完成: " + clientId);
emitters.remove(clientId);
});
emitter.onTimeout(() -> {
System.out.println("SSE连接超时: " + clientId);
emitters.remove(clientId);
});
emitter.onError((ex) -> {
System.out.println("SSE连接错误: " + clientId + ", 错误: " + ex.getMessage());
emitters.remove(clientId);
});
try {
// 发送连接成功的消息
emitter.send(SseEmitter.event()
.name("connect")
.data("连接成功")
.id(String.valueOf(System.currentTimeMillis())));
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
/**
* 广播消息给所有客户端
*/
@PostMapping("/broadcast")
public ResponseEntity<String> broadcast(@RequestBody MessageDTO message) {
sendToAllClients(message);
return ResponseEntity.ok("广播成功");
}
/**
* 发送消息给指定客户端
*/
@PostMapping("/send/{clientId}")
public ResponseEntity<String> sendMessage(
@PathVariable String clientId,
@RequestBody MessageDTO message) {
boolean success = sendToClient(clientId, message);
if (success) {
return ResponseEntity.ok("发送成功");
} else {
return ResponseEntity.status(HttpStatus.NOT_FOUND).body("客户端不存在");
}
}
/**
* 获取在线客户端数量
*/
@GetMapping("/online")
public ResponseEntity<Map<String, Object>> getOnlineCount() {
Map<String, Object> result = new HashMap<>();
result.put("onlineCount", emitters.size());
result.put("clientIds", new ArrayList<>(emitters.keySet()));
return ResponseEntity.ok(result);
}
/**
* 关闭指定客户端的连接
*/
@DeleteMapping("/close/{clientId}")
public ResponseEntity<String> closeConnection(@PathVariable String clientId) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
emitter.complete();
emitters.remove(clientId);
return ResponseEntity.ok("连接已关闭");
}
return ResponseEntity.status(HttpStatus.NOT_FOUND).body("客户端不存在");
}
/**
* 向所有客户端发送消息
*/
private void sendToAllClients(MessageDTO message) {
List<String> deadClients = new ArrayList<>();
emitters.forEach((clientId, emitter) -> {
try {
emitter.send(SseEmitter.event()
.name(message.getType())
.data(message.getContent())
.id(String.valueOf(System.currentTimeMillis()))
.reconnectTime(3000)); // 重连时间
} catch (IOException e) {
deadClients.add(clientId);
emitter.complete();
}
});
// 清理无效连接
deadClients.forEach(emitters::remove);
}
/**
* 向指定客户端发送消息
*/
private boolean sendToClient(String clientId, MessageDTO message) {
SseEmitter emitter = emitters.get(clientId);
if (emitter != null) {
try {
emitter.send(SseEmitter.event()
.name(message.getType())
.data(message.getContent())
.id(String.valueOf(System.currentTimeMillis())));
return true;
} catch (IOException e) {
emitter.complete();
emitters.remove(clientId);
}
}
return false;
}
}
1.3 消息DTO类
// MessageDTO.java
public class MessageDTO {
private String type; // 消息类型
private String content; // 消息内容
private Object data; // 附加数据
// 构造方法、getter、setter
public MessageDTO() {}
public MessageDTO(String type, String content) {
this.type = type;
this.content = content;
}
public MessageDTO(String type, String content, Object data) {
this.type = type;
this.content = content;
this.data = data;
}
// getter和setter方法
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public Object getData() { return data; }
public void setData(Object data) { this.data = data; }
}
1.4 配置类(可选)
// WebConfig.java
@Configuration
public class WebConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(0); // 异步请求不超时
}
}
2. Vue 前端实现
2.1 安装依赖(可选)
npm install event-source-polyfill # 如果需要兼容旧浏览器
2.2 SSE服务类
// src/services/sseService.js
class SSEService {
constructor() {
this.eventSource = null;
this.clientId = null;
this.listeners = new Map();
}
/**
* 建立SSE连接
* @param {string} clientId 客户端ID
* @param {string} baseURL 基础URL
*/
connect(clientId, baseURL = 'http://localhost:8080') {
if (this.eventSource) {
this.disconnect();
}
this.clientId = clientId;
const url = `${baseURL}/api/sse/connect/${clientId}`;
// 创建EventSource连接
this.eventSource = new EventSource(url);
// 监听连接打开
this.eventSource.onopen = (event) => {
console.log('SSE连接已建立');
this.emit('connected', event);
};
// 监听消息
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.emit('message', data);
// 根据事件名称分发
if (data.name) {
this.emit(data.name, data.data || data);
}
} catch (error) {
console.error('解析SSE消息失败:', error);
this.emit('raw-message', event.data);
}
};
// 监听错误
this.eventSource.onerror = (event) => {
console.error('SSE连接错误:', event);
this.emit('error', event);
// 如果是网络错误,尝试重连
if (event.target.readyState === EventSource.CLOSED) {
console.log('SSE连接已关闭,尝试重连...');
setTimeout(() => {
this.connect(this.clientId, baseURL);
}, 3000);
}
};
return this;
}
/**
* 添加事件监听器
* @param {string} eventName 事件名称
* @param {function} callback 回调函数
*/
on(eventName, callback) {
if (!this.listeners.has(eventName)) {
this.listeners.set(eventName, []);
}
this.listeners.get(eventName).push(callback);
return this;
}
/**
* 移除事件监听器
* @param {string} eventName 事件名称
* @param {function} callback 回调函数
*/
off(eventName, callback) {
if (this.listeners.has(eventName)) {
const callbacks = this.listeners.get(eventName);
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
}
}
return this;
}
/**
* 触发事件
* @param {string} eventName 事件名称
* @param {any} data 数据
*/
emit(eventName, data) {
if (this.listeners.has(eventName)) {
this.listeners.get(eventName).forEach(callback => {
try {
callback(data);
} catch (error) {
console.error(`执行事件${eventName}的回调时出错:`, error);
}
});
}
}
/**
* 发送消息到服务器
* @param {object} message 消息对象
* @param {string} endpoint 端点
* @param {string} baseURL 基础URL
*/
async sendMessage(message, endpoint = 'broadcast', baseURL = 'http://localhost:8080') {
try {
const url = `${baseURL}/api/sse/${endpoint}`;
const options = {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(message)
};
const response = await fetch(url, options);
return await response.json();
} catch (error) {
console.error('发送消息失败:', error);
throw error;
}
}
/**
* 发送到指定客户端
* @param {string} targetClientId 目标客户端ID
* @param {object} message 消息对象
* @param {string} baseURL 基础URL
*/
async sendToClient(targetClientId, message, baseURL = 'http://localhost:8080') {
return this.sendMessage(message, `send/${targetClientId}`, baseURL);
}
/**
* 断开连接
*/
disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
console.log('SSE连接已断开');
this.emit('disconnected');
}
}
/**
* 检查连接状态
*/
isConnected() {
return this.eventSource && this.eventSource.readyState === EventSource.OPEN;
}
}
// 创建单例实例
const sseService = new SSEService();
export default sseService;
2.3 Vue组件中使用
<!-- SSEDemo.vue -->
<template>
<div class="sse-demo">
<h2>SSE实时通信演示</h2>
<!-- 连接控制 -->
<div class="connection-panel">
<input v-model="clientId" placeholder="输入客户端ID" />
<button @click="connect" :disabled="isConnected">连接</button>
<button @click="disconnect" :disabled="!isConnected">断开</button>
<span class="status" :class="{ connected: isConnected }">
{{ isConnected ? '已连接' : '未连接' }}
</span>
</div>
<!-- 消息发送 -->
<div class="message-panel">
<h3>发送消息</h3>
<select v-model="messageType">
<option value="chat">聊天</option>
<option value="notification">通知</option>
<option value="system">系统</option>
</select>
<input v-model="messageContent" placeholder="输入消息内容" />
<button @click="sendBroadcast" :disabled="!isConnected">广播</button>
<button @click="sendToSelf" :disabled="!isConnected">发送给自己</button>
</div>
<!-- 在线信息 -->
<div class="online-info">
<button @click="getOnlineInfo" :disabled="!isConnected">刷新在线信息</button>
<p>在线人数: {{ onlineInfo.onlineCount || 0 }}</p>
</div>
<!-- 消息显示 -->
<div class="messages">
<h3>收到的消息</h3>
<div class="message-list">
<div v-for="(msg, index) in messages" :key="index"
class="message-item" :class="msg.type">
<span class="time">{{ formatTime(msg.timestamp) }}</span>
<span class="type">[{{ msg.type }}]</span>
<span class="content">{{ msg.content }}</span>
</div>
</div>
</div>
</div>
</template>
<script>
import sseService from '@/services/sseService';
export default {
name: 'SSEDemo',
data() {
return {
clientId: '',
messageType: 'chat',
messageContent: '',
isConnected: false,
messages: [],
onlineInfo: {}
};
},
mounted() {
// 生成随机客户端ID
this.clientId = 'client_' + Math.random().toString(36).substr(2, 9);
// 注册事件监听器
this.registerEventListeners();
},
beforeUnmount() {
// 组件销毁前断开连接
sseService.disconnect();
},
methods: {
registerEventListeners() {
// 连接成功
sseService.on('connected', () => {
this.isConnected = true;
this.addMessage('system', '连接成功');
});
// 连接断开
sseService.on('disconnected', () => {
this.isConnected = false;
this.addMessage('system', '连接已断开');
});
// 接收消息
sseService.on('message', (data) => {
this.addMessage('message', JSON.stringify(data));
});
// 特定类型消息
sseService.on('chat', (data) => {
this.addMessage('chat', typeof data === 'string' ? data : JSON.stringify(data));
});
sseService.on('notification', (data) => {
this.addMessage('notification', typeof data === 'string' ? data : JSON.stringify(data));
});
sseService.on('system', (data) => {
this.addMessage('system', typeof data === 'string' ? data : JSON.stringify(data));
});
// 错误
sseService.on('error', (error) => {
console.error('SSE错误:', error);
this.addMessage('system', '连接错误');
});
},
connect() {
if (!this.clientId.trim()) {
alert('请输入客户端ID');
return;
}
sseService.connect(this.clientId);
},
disconnect() {
sseService.disconnect();
},
async sendBroadcast() {
if (!this.messageContent.trim()) {
alert('请输入消息内容');
return;
}
try {
await sseService.sendMessage({
type: this.messageType,
content: this.messageContent
});
this.messageContent = '';
} catch (error) {
alert('发送失败: ' + error.message);
}
},
async sendToSelf() {
if (!this.messageContent.trim()) {
alert('请输入消息内容');
return;
}
try {
await sseService.sendToClient(this.clientId, {
type: this.messageType,
content: `[自发送] ${this.messageContent}`
});
this.messageContent = '';
} catch (error) {
alert('发送失败: ' + error.message);
}
},
async getOnlineInfo() {
try {
const response = await fetch('http://localhost:8080/api/sse/online');
this.onlineInfo = await response.json();
} catch (error) {
console.error('获取在线信息失败:', error);
}
},
addMessage(type, content) {
this.messages.unshift({
type,
content,
timestamp: new Date()
});
// 限制消息数量
if (this.messages.length > 50) {
this.messages.pop();
}
},
formatTime(timestamp) {
return new Date(timestamp).toLocaleTimeString();
}
}
};
</script>
<style scoped>
.sse-demo {
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
.connection-panel, .message-panel, .online-info {
margin-bottom: 20px;
padding: 15px;
border: 1px solid #ddd;
border-radius: 5px;
}
.connection-panel input, .message-panel input, .message-panel select {
margin-right: 10px;
padding: 5px 10px;
}
button {
padding: 5px 15px;
margin-right: 10px;
cursor: pointer;
}
button:disabled {
cursor: not-allowed;
opacity: 0.5;
}
.status {
padding: 2px 8px;
border-radius: 3px;
}
.status.connected {
background-color: #4CAF50;
color: white;
}
.messages {
border: 1px solid #ddd;
border-radius: 5px;
padding: 15px;
}
.message-list {
height: 300px;
overflow-y: auto;
border: 1px solid #eee;
padding: 10px;
}
.message-item {
margin-bottom: 10px;
padding: 5px;
border-radius: 3px;
}
.message-item.chat {
background-color: #e3f2fd;
}
.message-item.notification {
background-color: #fff3e0;
}
.message-item.system {
background-color: #f3e5f5;
}
.time {
font-size: 12px;
color: #666;
margin-right: 10px;
}
.type {
font-weight: bold;
margin-right: 10px;
}
</style>
3. 高级特性示例
3.1 带认证的SSE连接
// 在Controller中添加认证
@GetMapping("/connect/{clientId}")
public SseEmitter connect(@PathVariable String clientId,
@RequestHeader(value = "Authorization", required = false) String token) {
// 验证token逻辑
if (!validateToken(token)) {
throw new SecurityException("未授权访问");
}
// ... 其余代码相同
}
3.2 心跳检测
// 定期发送心跳
@Component
public class HeartbeatScheduler {
@Autowired
private SseController sseController;
@Scheduled(fixedRate = 30000) // 每30秒发送一次心跳
public void sendHeartbeat() {
MessageDTO heartbeat = new MessageDTO("heartbeat", "ping");
sseController.sendToAllClients(heartbeat);
}
}
这个完整的实现提供了:
- 后端功能:连接管理、消息广播、定向发送、连接监控
- 前端功能:连接管理、消息收发、事件监听、状态显示
- 错误处理:自动重连、连接状态监控
- 扩展性:易于添加新的消息类型和业务逻辑
你可以根据具体需求调整消息格式、认证方式和业务逻辑。
到此这篇关于springboot结合vue实现sse接口详细流程的文章就介绍到这了,更多相关springboot vue实现sse接口内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
