SpringBoot实现SSE(Server-Sent Events)的完整指南
作者:堕落年代
本文展示了如何在Spring Boot应用中实现SSE,通过简单的步骤和代码示例,你可以轻松地在你的Web应用中添加实时数据推送功能,需要的朋友可以参考下
引言
在 Spring Boot 中实现 SSE (Server-Sent Events) 非常简单,SSE 是一种服务器向客户端推送事件的技术。以下是完整的实现步骤:
基础实现
1. 添加依赖
确保 pom.xml
中包含 Spring Web 依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
2. 创建 SSE 控制器
import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @RestController public class SseController { // 用于异步发送事件的线程池 private final ExecutorService executor = Executors.newCachedThreadPool(); @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter handleSse() { SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间(毫秒) // 在单独的线程中发送事件 executor.execute(() -> { try { for (int i = 0; i < 10; i++) { // 构建事件 SseEmitter.SseEventBuilder event = SseEmitter.event() .id(String.valueOf(i)) // 事件ID .name("message") // 事件名称 .data("Event #" + i); // 事件数据 // 发送事件 emitter.send(event); // 模拟延迟 Thread.sleep(1000); } // 完成发送 emitter.complete(); } catch (IOException | InterruptedException e) { // 发生错误时关闭连接 emitter.completeWithError(e); } }); // 处理完成和超时事件 emitter.onCompletion(() -> System.out.println("SSE completed")); emitter.onTimeout(() -> { System.out.println("SSE timeout"); emitter.complete(); }); return emitter; } }
3. 前端监听 SSE
<!DOCTYPE html> <html> <head> <title>SSE Demo</title> </head> <body> <div id="events"></div> <script> const eventSource = new EventSource('/sse'); // 监听消息事件 eventSource.onmessage = function(event) { const data = event.data; const element = document.createElement('p'); element.textContent = 'Received: ' + data; document.getElementById('events').appendChild(element); }; // 监听自定义事件 eventSource.addEventListener('message', function(event) { console.log('Custom event:', event.data); }); // 错误处理 eventSource.onerror = function(error) { console.error('EventSource error:', error); eventSource.close(); }; </script> </body> </html>
添加鉴权支持
1. 基于 Token 的鉴权
@GetMapping(path = "/secure-sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter handleSecureSse(@RequestHeader("Authorization") String token) { // 验证Token if (!isValidToken(token)) { throw new ResponseStatusException(HttpStatus.UNAUTHORIZED, "Invalid token"); } SseEmitter emitter = new SseEmitter(); // 获取用户信息 User user = getUserFromToken(token); executor.execute(() -> { try { // 发送个性化事件 emitter.send(SseEmitter.event() .data("Welcome, " + user.getName()) .name("greeting")); // 继续发送其他事件... } catch (IOException e) { emitter.completeWithError(e); } }); return emitter; } private boolean isValidToken(String token) { // 实现Token验证逻辑 return token != null && token.startsWith("Bearer "); } private User getUserFromToken(String token) { // 从Token中提取用户信息 return new User("John Doe"); // 示例 }
2. 前端发送鉴权信息
const token = "Bearer your_jwt_token_here"; const eventSource = new EventSource('/secure-sse', { headers: { Authorization: token } });
高级功能实现
1. 广播事件给多个客户端
import org.springframework.stereotype.Service; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @Service public class SseService { private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>(); public SseEmitter subscribe() { SseEmitter emitter = new SseEmitter(60_000L); emitters.add(emitter); emitter.onCompletion(() -> emitters.remove(emitter)); emitter.onTimeout(() -> emitters.remove(emitter)); return emitter; } public void broadcast(String eventName, Object data) { for (SseEmitter emitter : emitters) { try { emitter.send(SseEmitter.event() .name(eventName) .data(data)); } catch (IOException e) { emitter.completeWithError(e); } } } }
2. 在控制器中使用广播服务
@RestController public class SseController { private final SseService sseService; public SseController(SseService sseService) { this.sseService = sseService; } @GetMapping("/subscribe") public SseEmitter subscribe() { return sseService.subscribe(); } @PostMapping("/broadcast") public ResponseEntity<String> broadcastMessage(@RequestBody String message) { sseService.broadcast("message", message); return ResponseEntity.ok("Message broadcasted"); } }
3. 发送 JSON 数据
emitter.send(SseEmitter.event() .name("userUpdate") .data(new User("Alice", "alice@example.com"), MediaType.APPLICATION_JSON));
4. 重连机制
let eventSource; function connectSSE() { eventSource = new EventSource('/sse'); eventSource.onmessage = event => { console.log('Received:', event.data); }; eventSource.onerror = () => { console.log('Connection lost. Reconnecting...'); eventSource.close(); setTimeout(connectSSE, 3000); // 3秒后重连 }; } connectSSE(); // 初始连接
生产环境最佳实践
1. 配置超时和心跳
@Bean public SseEmitter createSseEmitter() { SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时 // 心跳机制 ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(() -> { try { emitter.send(SseEmitter.event() .name("heartbeat") .data("ping")); } catch (IOException e) { scheduler.shutdown(); } }, 0, 30, TimeUnit.SECONDS); // 每30秒发送心跳 return emitter; }
2. 异常处理
@RestControllerAdvice public class SseExceptionHandler { @ExceptionHandler(SseException.class) public ResponseEntity<String> handleSseException(SseException ex) { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) .body("SSE Error: " + ex.getMessage()); } }
3. CORS 配置
@Configuration public class WebConfig implements WebMvcConfigurer { @Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping("/sse/**") .allowedOrigins("https://your-frontend.com") .allowedMethods("GET") .allowCredentials(true); } }
4. 性能优化
@Configuration public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(100); executor.setThreadNamePrefix("SSE-Executor-"); executor.initialize(); return executor; } }
完整示例:实时股票报价
后端控制器
@RestController public class StockController { private final SseService sseService; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public StockController(SseService sseService) { this.sseService = sseService; startStockUpdates(); } @GetMapping("/stocks") public SseEmitter getStockUpdates() { return sseService.subscribe(); } private void startStockUpdates() { scheduler.scheduleAtFixedRate(() -> { Map<String, Double> stocks = Map.of( "AAPL", 150 + Math.random() * 10, "MSFT", 250 + Math.random() * 15, "GOOGL", 2800 + Math.random() * 50 ); sseService.broadcast("stockUpdate", stocks); }, 0, 2, TimeUnit.SECONDS); } }
前端实现
<div id="stock-prices"></div> <script> const eventSource = new EventSource('/stocks'); eventSource.addEventListener('stockUpdate', event => { const stocks = JSON.parse(event.data); let html = '<h2>Stock Prices</h2><ul>'; for (const [symbol, price] of Object.entries(stocks)) { html += `<li>${symbol}: $${price.toFixed(2)}</li>`; } html += '</ul>'; document.getElementById('stock-prices').innerHTML = html; }); </script>
部署注意事项
1. 负载均衡配置
# Nginx 配置 location /sse { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Connection ''; proxy_buffering off; }
2. Spring Boot 配置
# application.properties server.servlet.context-path=/api spring.mvc.async.request-timeout=120000 # 2分钟超时
3. 监控端点
@Endpoint(id = "sse") public class SseEndpoint { private final SseService sseService; public SseEndpoint(SseService sseService) { this.sseService = sseService; } @ReadOperation public Map<String, Object> sseMetrics() { return Map.of( "activeConnections", sseService.getActiveConnections(), "lastBroadcast", sseService.getLastBroadcastTime() ); } }
最佳实践总结
- 使用专用服务类:封装 SSE 逻辑,提高代码复用性
- 实现心跳机制:防止连接超时断开
- 添加鉴权支持:保护敏感数据
- 优雅处理错误:实现异常处理和重连机制
- 监控连接状态:使用 Actuator 端点监控 SSE 连接
- 优化线程池:合理配置异步处理线程
- 前端重连逻辑:自动恢复断开连接
通过以上实现,您可以在 Spring Boot 应用中轻松创建 SSE 端点,实现服务器向客户端的实时事件推送,同时满足鉴权需求。
以上就是SpringBoot实现SSE(Server-Sent Events)完整指南的详细内容,更多关于SpringBoot实现SSE(Server-Sent Events)的资料请关注脚本之家其它相关文章!