Springboot 集成 SocketIO的示例代码
作者:我只会发热
1 前言
1.1 什么是 SocketIO ?
Socket.IO 是一个可以在浏览器与服务器之间实现实时、双向、基于事件的通信的工具库。 Socket.IO 能够在任何平台、浏览器或设备上运行,可靠性和速度同样出色。其本质上是将 webSocket、Ajax 和其他通信方式再封装了一层,更强大,适应性和兼容性更好。
(这句话怎么理解呢?简单的来说,就是客户端可以给服务端发消息,服务端也可以给客户端发消息,而链接它们之间的消息纽带,就是“事件监听”。)
1.2 webSocket 的优点
webSocket 和 socket.io 区别?
- webSocketa:一种让客户端和服务器之间能进行双向实时通信的技术
b:使用时,虽然主流浏览器都已经支持,但仍然可能有不兼容的情况
c:适合用于client和基于node搭建的服务端使用 - socket.ioa:将 webSocket、Ajax 和其它的通信方式全部封装成了统一的通信接口
b:使用时,不用担心兼容问题,底层会自动选用最佳的通信方式
c:适合进行服务端和客户端双向数据通信
d:Socket.IO中文网地址:https://socket.nodejs.cn/docs/v4/
1.3 应用及版本
- spring-boot:2.5.14
- socketio:2.0.3
- jdk:java8
- 本文是基于《若依前后端分离》版本的基础上进行代码编写和演示的
2 物料准备(均为后端代码)
2.1 添加 Socket 依赖包
<dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>2.0.3</version> </dependency>
2.2 创建频道常量类:SocketEventContants
我这个常量类是为了统一频道所建,你们不一定需要这个类
package com.mss.common.constant; /** * @Description: Socket 自定义事件名称 * @Author: zhanleai */ public class SocketEventContants { /** * 用户频道 **/ public static final String CHANNEL_USER = "channel_user"; /** * 系统频道 **/ public static final String CHANNEL_SYSTEM = "channel_system"; }
2.3 创建 Socket 连接类:SocketHandler
- 用来监听 socket 客户端上下线,以及服务端自动关闭;
- 有些博主把这个类的内容跟工具类里监听事件方法放在一起,个人认为需要解耦,特别是在分布式的项目中;
package com.mss.framework.handle; import com.corundumstudio.socketio.SocketIOServer; import com.mss.common.utils.socket.SocketUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.springframework.stereotype.Component; /** * @Author: zhanleai * @Description: 客户端自动连接和断开、服务端关闭 */ @Component @Slf4j public class SocketHandler { @Autowired private SocketIOServer socketIoServer; /** * 容器销毁前,自动调用此方法,关闭 socketIo 服务端 * * @Param [] * @return **/ @PreDestroy private void destroy(){ try { log.debug("关闭 socket 服务端"); socketIoServer.stop(); }catch (Exception e){ e.printStackTrace(); } } @PostConstruct public void init() { log.debug("SocketEventListener initialized"); //添加监听,客户端自动连接到 socket 服务端 socketIoServer.addConnectListener(client -> { String userId = client.getHandshakeData().getSingleUrlParam("userId"); SocketUtil.connectMap.put(userId, client); log.debug("客户端userId: "+ userId+ "已连接,客户端ID为:" + client.getSessionId()); }); //添加监听,客户端跟 socket 服务端自动断开 socketIoServer.addDisconnectListener(client -> { String userId = client.getHandshakeData().getSingleUrlParam("userId"); SocketUtil.connectMap.remove(userId, client); log.debug("客户端userId:" + userId + "断开连接,客户端ID为:" + client.getSessionId()); }); } // // 注释说明:以下 onConnect和 onDisconnect 方法在某些场景下会失效,不建议使用,所以注释掉 // /** // * 客户端自动连接到 socket 服务端 // * // * @Param [client] // * @return // **/ // @OnConnect // public void onConnect(SocketIOClient client) { // String userId = client.getHandshakeData().getSingleUrlParam("userId"); // SocketUtil.connectMap.put(userId, client); // log.debug("客户端userId: "+ userId+ "已连接,客户端ID为:" + client.getSessionId()); // } // // /** // * 客户端跟 socket 服务端自动断开 // * // * @Param [client] // * @return // **/ // @OnDisconnect // public void onDisconnect(SocketIOClient client) { // String userId = client.getHandshakeData().getSingleUrlParam("userId"); // log.debug("客户端userId:" + userId + "断开连接,客户端ID为:" + client.getSessionId()); // SocketUtil.connectMap.remove(userId, client); // } }
2.4 Socket 配置文件和配置类
用来定义 socket 的一些配置
2.4.1 yml 配置
socketio: host: 127.0.0.1 //主机名,默认是 0.0.0.0 (这个设不设置无所谓,因为后面的 SocketConfig 类一般不用设置这个) port: 33000 //监听端口 maxFramePayloadLength: 1048576 maxHttpContentLength: 1048576 bossCount: 1 workCount: 100 allowCustomRequests: true upgradeTimeout: 1000000 //协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间 pingTimeout: 6000000 //Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件 pingInterval: 25000 //Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
2.4.2 配置类:SocketConfig
package com.mss.framework.config; import com.corundumstudio.socketio.SocketIOServer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component public class SocketConfig { @Value("${socketio.host}") private String host; @Value("${socketio.port}") private Integer port; @Value("${socketio.bossCount}") private int bossCount; @Value("${socketio.workCount}") private int workCount; @Value("${socketio.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socketio.upgradeTimeout}") private int upgradeTimeout; @Value("${socketio.pingTimeout}") private int pingTimeout; @Value("${socketio.pingInterval}") private int pingInterval; @Bean public SocketIOServer socketIOServer() { com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration(); configuration.setPort(port); com.corundumstudio.socketio.SocketConfig socketConfig=new com.corundumstudio.socketio.SocketConfig(); socketConfig.setReuseAddress(true); configuration.setSocketConfig(socketConfig); configuration.setOrigin(null); configuration.setBossThreads(bossCount); configuration.setWorkerThreads(workCount); configuration.setAllowCustomRequests(allowCustomRequests); configuration.setUpgradeTimeout(upgradeTimeout); configuration.setPingTimeout(pingTimeout); configuration.setPingInterval(pingInterval); //设置 sessionId 随机 configuration.setRandomSession(true); // configuration.setKeyStorePassword("pi0yo93pqgrs"); // configuration.setKeyStore(this.getClass().getResourceAsStream("www.ibms.club.jks")); // configuration.setAuthorizationListener(data -> { // String token = data.getSingleUrlParam("token"); // return StrUtil.isNotBlank(token); // }); //初始化 Socket 服务端配置 return new SocketIOServer(configuration); } /** * Spring加载 SocketIOServer * * @Param [server] * @return **/ @Bean public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketIOServer ) { return new SpringAnnotationScanner(socketIOServer ); } }
2.5 Socket 服务启动类:ServerRunner
实现 CommandLineRunner 接口类,项目启动时自动执行 socketIOServer.start() 方法
package com.mss.framework.run; import com.corundumstudio.socketio.SocketIOServer; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Slf4j @Component @AllArgsConstructor public class ServerRunner implements CommandLineRunner { private final SocketIOServer socketIOServer; /** * 项目启动时,自动启动 socket 服务,服务端开始工作 * * @Param [args] * @return **/ @Override public void run(String... args) { socketIOServer.start(); log.info("socket.io server started !"); } }
2.6 Socket 工具类:SocketUtil
下列实例代码中,是使用 userId 来当做客户端唯一标识,这个每个人可以根据自己项目里自行设置;
下列实例代码的应用场景,只有服务端向客户端发送消息的需求,所以实际这个工具类只有 sendToOne() 方法是实际起作用的,其余的代码都是为了本文额外写的方法;
package com.mss.common.utils.socket; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.annotation.OnEvent; import com.mss.common.constant.SocketEventContants; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * @Author: zhanleai * @Description: */ @Component @Slf4j public class SocketUtil { //暂且把用户&客户端信息存在缓存 public static ConcurrentMap<String, SocketIOClient> connectMap = new ConcurrentHashMap<>(); /** * 单发消息(以 userId 为标识符,给用户发送消息) * * @Param [userId, message] * @return **/ public static void sendToOne(String userId, Object message) { //拿出某个客户端信息 SocketIOClient socketClient = getSocketClient(userId); if (Objects.nonNull(socketClient) ){ //单独给他发消息 socketClient.sendEvent(SocketEventContants.CHANNEL_USER,message); }else{ log.info(userId + "已下线,暂不发送消息。"); } } /** * 群发消息 * * @Param * @return **/ public static void sendToAll(Object message) { if (connectMap.isEmpty()){ return; } //给在这个频道的每个客户端发消息 for (Map.Entry<String, SocketIOClient> entry : connectMap.entrySet()) { entry.getValue().sendEvent(SocketEventContants.CHANNEL_SYSTEM, message); } } /** * 根据 userId 识别出 socket 客户端 * @param userId * @return */ public static SocketIOClient getSocketClient(String userId){ SocketIOClient client = null; if (StringUtils.hasLength(userId) && !connectMap.isEmpty()){ for (String key : connectMap.keySet()) { if (userId.equals(key)){ client = connectMap.get(key); } } } return client; } /** * 1)使用事件注解,服务端监听获取客户端消息; * 2)拿到客户端发过来的消息之后,可以再根据业务逻辑发送给想要得到这个消息的人; * 3)channel_system 之所以会向全体客户端发消息,是因为我跟前端约定好了,你们也可以自定定义; * * @Param message * @return **/ @OnEvent(value = SocketEventContants.CHANNEL_SYSTEM) public void channelSystemListener(String message) { if (!StringUtils.hasLength(message)){ return; } this.sendToAll(message); } }
3 Socket 调用
3.1 实际项目的应用场景:在需要发送消息通知的业务代码中调用
这个方法里有几个类:Message、DateUtils、IMessageService、MessageMapper,均为根据自身业务场景自定义的类,你们自己建吧。有需要再私信我要;
后端代码写到这里,实际上已经写完了。从 3.2 开始均为测试代码;
package com.mss.message.service.impl; import com.mss.common.utils.DateUtils; import com.mss.common.utils.socket.SocketUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.mss.message.mapper.MessageMapper; import com.mss.message.domain.entity.Message; import com.mss.message.service.IMessageService; /** * 消息Service业务层处理 * * @author zhanleai */ @Service @Slf4j public class MessageServiceImpl implements IMessageService { @Autowired private MessageMapper messageMapper; /** * 新增消息 * * @param message 消息 * @return 结果 */ @Override public int insertMessage(Message message) { message.setSendTime(DateUtils.getNowDate()); // 消息入库,消息持久化 int i = messageMapper.insertMessage(message); if(i > 0){ // 新增消息之后,再向前端推送 Socket 消息 SocketUtil.sendToOne(message.getSendUserId().toString(),message); } return i; } }
3.2 测试Controller
下文均为测试的代码
package com.mss.message.controller; import com.mss.common.utils.socket.SocketUtil; import io.swagger.annotations.Api; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.mss.common.core.controller.BaseController; import com.mss.common.core.domain.AjaxResult; /** * 消息Controller * * @author zhanleai */ @RestController @Api(tags="消息") @RequestMapping("/message") public class MessageController extends BaseController { /** * 给指定客户端发送消息 * * @Param [userId, message] * @return **/ @GetMapping("/sendToOne") public AjaxResult sendToOne(String userId , String message){ SocketUtil.sendToOne(userId,message); return AjaxResult.success("单独发送消息成功。"); } }
4 前端调用代码
- 前端代码监听了 channel_user 和 channel_system 两个频道,一个做了三个动作:
- 1)连接上服务端;
2)监听并接收 channel_user 频道的消息;
3)给服务端发送一条消息,并广播到所有客户端; - postman 只做了一个动作,给后端指定的 userId 发送一条 channel_user 频道的消息,并被指定客户端捕获;
4.1 html 测试代码以及说明
详细的 html 测试代码
<!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <title>TestConnect</title> <base> <script src="https://cdn.bootcss.com/jquery/3.4.0/jquery.min.js"></script> <script src="https://cdn.bootcss.com/socket.io/2.0.3/socket.io.js"></script> <style> body { padding: 20px; } #console { height: 450px; overflow: auto; } .msg-color { color: green; } </style> </head> <body> <div id="console" class="well"></div> </body> <script type="text/javascript"> var socket; connect(); function connect() { var userId = 'zhanleai'; var opts = { query: 'userId=' + userId }; socket = io.connect('http://127.0.0.1:33000', opts); socket.on('connect', function () { console.log("连接成功"); output('当前用户是:' + userId ); output('<span class="msg-color">连接成功了。</span>'); }); socket.on('disconnect', function () { output('<span class="msg-color">下线了。 </span>'); }); socket.on('channel_user', function (data) { let msg= JSON.stringify(data) output('收到 channel_user 频道消息了:' + msg ); console.log(data); }); } function output(message) { var element = $("<div>" + message + "</div>"); $('#console').prepend(element); } </script> </html>
4.2 浏览器打开 html 文件,然后查看后端服务日志
(socket 服务端启动,端口号为 33000,客户端 zhanleai 连接上来了)
浏览器截图
后端服务日志截图
4.3 postman 工具测试
postman 截图
浏览器收到消息截图
到此这篇关于Springboot 集成 SocketIO的示例代码的文章就介绍到这了,更多相关Springboot 集成 SocketIO内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!