springboot整合SSE技术开发小结
作者:飞翔的佩奇
一、开发背景
公司需要开发一个大屏界面,大屏页面的数据是实时更新的,由后端主动实时推送数据给大屏页面。此时会立刻联想到:websocket 技术。当然使用websocket,确实可以解决这个场景。但是今天本文的主角是 :SSE,他和websocket略有不同,SSE只能由服务端主动发消息,而websocket前后端都可以推送消息。
二、快速了解SSE
1、概念
SSE全称 Server Sent Event,顾名思义,就是服务器发送事件,所以也就注定了他 只能由服务端发送信息。
2、特性
- 主动从服务端推送消息的技术
- 本质是一个HTTP的长连接
- 发送的是一个stream流,格式为text/event-stream
三、开发思路
要实现后端的实时推送消息,前台实时更新数据,思路如下:
- 1、前后端需要建立连接
- 2、后端如何做到实时推送信息呢?可以采用定时调度
四、代码演示
1、引入依赖
原则上是不需要引入的,因为springboot底层已经整合了SSE
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
2、服务端代码
controller层
@RestController @CrossOrigin @RequestMapping("/sse") public class SseEmitterController extends BaseController { @Autowired private SseEmitterService sseEmitterService; /** * 创建SSE连接 * * @return */ @GetMapping("/connect/{type}") public SseEmitter connect(@PathVariable("type") String type) { return sseEmitterService.connect(type); } }
service层
public interface SseEmitterService { SseEmitter connect(String type); void volumeOverview(); void sysOperation(); void monitor(); ........ }
service实现层
@Service public class SseEmitterServiceImpl implements SseEmitterService { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private static Map<String, SseEmitterUTF8> sseCache = new ConcurrentHashMap<>(); /** * 创建连接sse * @param type * @return */ @Override public SseEmitter connect(String type) { final String clientId = UUID.randomUUID().toString().replace("-", ""); SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L); try { sseEmitter.send(SseEmitter.event().comment("创建连接成功 !!!")); } catch (IOException e) { logger.error("创建连接失败 , {} " , e.getMessage()); } sseEmitter.onCompletion(() -> { logger.info("connect onCompletion , {} 结束连接 ..." , clientId); removeClient(clientId); }); sseEmitter.onTimeout(() -> { logger.info("connect onTimeout , {} 连接超时 ..." , clientId); removeClient(clientId); }); sseEmitter.onError((throwable) -> { logger.error("connect onError , {} 连接异常 ..." , clientId); removeClient(clientId); }); sseCache.put(clientId, sseEmitter); //立即推送 volumeOverview(); dealResp(); monitor(); if (type.equals(SseEmitterConstant.OVER_VIEW)){ sysOperation(); mileStone(); } logger.info("当前用户总连接数 : {} " , sseCache.size()); return sseEmitter; } /** * 交易量概览 */ @Override public void volumeOverview() { Map<String,Object> map = new HashMap<>(); map.put("latest_tps",440.3); map.put("total_cics_trans",341656001); map.put("total_zjcx_trans",391656001); map.put("zjcx_tps",23657); map.put("day10",48388352); map.put("history",105013985); SseEmitter.SseEventBuilder data = SseEmitter.event().name(SseEmitterConstant.VOLUME_OVERVIEW).data(map, MediaType.APPLICATION_JSON); for (Map.Entry<String, SseEmitterUTF8> entry : sseCache.entrySet()) { SseEmitterUTF8 sseEmitter = entry.getValue(); if (sseEmitter == null) { continue; } try { sseEmitter.send(data); } catch (IOException e) { String body = "SseEmitterServiceImpl[volumeOverview ]"; logger.error(body + ": 向客户端 {} 推送消息失败 , 尝试进行重推 : {}", entry.getKey() ,e.getMessage()); messageRepush(entry.getKey(),data,body); } } } private void messageRepush(String type, SseEmitter.SseEventBuilder data,String body){ for (int i = 0; i < 3; i++) { try { Thread.sleep(2000); SseEmitterUTF8 sseEmitter = sseCache.get(type); if (sseEmitter == null) { logger.error(body + " :向客户端{} 第{}次消息重推失败,未创建长链接", type, i + 1); continue; } sseEmitter.send(data); } catch (Exception ex) { logger.error(body + " :向客户端{} 第{}次消息重推失败", type, i + 1, ex); continue; } logger.info(body + " :向客户端{} 第{}次消息重推成功", type, i + 1); return; } }
常量类
public class SseEmitterConstant { /** * 创建连接的客户端类型 */ public static final String OVER_VIEW = "overview"; /** * even 数据类型 */ public static final String VOLUME_OVERVIEW = "vw"; public SseEmitterConstant(){} }
3、后端定时任务代码
采用注解的方式实现:@Scheduled,使用该注解时,需要增加这个注解@EnableScheduling,相当于来开启定时调度功能,如果不加@EnableScheduling注解,那么定时调度会不生效的。
启动类增加注解@EnableScheduling
package com.hidata; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) @EnableScheduling public class HidataApplication { public static void main(String[] args) { SpringApplication.run(HidataApplication.class, args); System.out.println("[HiUrlShorter platform startup!]"); } }
创建 定时任务调度类,在该类上加上@Scheduled注解,
@Configuration public class SendMessageTask{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private SseEmitterService sseEmitterService; @Scheduled(cron = "0/40 * * * * ?}") public void volumeOverviewTask() { try { sseEmitterService.volumeOverview(); } catch (Exception e) { logger.error("SendMessageTask [volumeOverviewTask]: {} ",e.getMessage()); } } ....... }
4、解决乱码的实体类
如果发送中文数据的时候,会出现乱码的现象。此时需要做对应的处理
package com.hidata.devops.lagrescreen.domain; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.nio.charset.StandardCharsets; public class SseEmitterUTF8 extends SseEmitter { public SseEmitterUTF8(Long timeout) { super(timeout); } @Override protected void extendResponse(ServerHttpResponse outputMessage) { super.extendResponse(outputMessage); HttpHeaders headers = outputMessage.getHeaders(); headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8)); } }
5、前端代码
// 连接服务器 var sseSource = new EventSource("http://localhost:8080/sse/connect"); // 连接打开 sseSource.onopen = function () { console.log("连接打开"); } // 连接错误 sseSource.onerror = function (err) { console.log("连接错误:", err); } //接收信息 eventSource.addEventListener("vw", function (event) { console.log(event.data); ..... });
五、核心代码分析
先看代码片段
SseEmitter.event().name("vw").data(map, MediaType.APPLICATION_JSON);
分析:
后端不会把所有数据一起发送给前端,而是会把页面分成多个模块,然后发给前端,此时前端需要区分哪一块数据对应哪一块页面。所以我们可以给各个模块的数据起个名字。也就是上述的代码
SseEmitter.event().name("vw")
这样,前端就知道怎么渲染页面了,类似于这样
关于even()的属性,可以查看源码,
public interface SseEventBuilder { SseEmitter.SseEventBuilder id(String var1); SseEmitter.SseEventBuilder name(String var1); SseEmitter.SseEventBuilder reconnectTime(long var1); SseEmitter.SseEventBuilder comment(String var1); SseEmitter.SseEventBuilder data(Object var1); SseEmitter.SseEventBuilder data(Object var1, @Nullable MediaType var2); Set<DataWithMediaType> build(); }
到此这篇关于springboot整合SSE技术开发小结的文章就介绍到这了,更多相关springboot整合SSE内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!