SpringBoot整合Netty+Websocket实现消息推送的示例代码
作者:爱生活,更爱技术
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,本文主要介绍了SpringBoot整合Netty+Websocket实现消息推送的示例代码,具有一定的参考价值,感兴趣的可以了解一下
前言
Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。以下是Netty的主要优势:
- 高性能:Netty基于NIO(非阻塞IO)模型,采用事件驱动的设计,具有高性能的特点。它通过零拷贝技术、内存池化技术等手段,进一步提高了IO性能,降低了资源消耗。
- 易用性:Netty提供了丰富的API和功能,如对TCP、UDP和文件传输的支持,以及对SSL/TLS、压缩、编解码等功能的内置实现。这些功能简化了网络应用的开发,降低了学习和使用的难度。
- 可扩展性:Netty采用了模块化设计,各个模块之间耦合度低,易于扩展。开发者可以根据需要定制和扩展Netty的功能,如添加新的编解码器、处理器或协议。
- 稳定性:Netty经过了大规模生产环境的验证,具有高稳定性。它通过合理的线程模型、资源管理和错误处理机制,保证了系统的稳定性和可靠性。
- 社区活跃:Netty拥有一个活跃的开源社区,不断有新的功能和优化被贡献出来。这为开发者提供了强大的支持,也促进了Netty的发展和完善。
- 跨平台性:Netty可以在多种操作系统和平台上运行,如Windows、Linux和Mac OS等。这一特性使得开发者可以轻松地在不同环境下部署和维护网络应用。
WebSocket 是一种网络通信协议,相比传统的HTTP协议,它具有以下优势:
- 实时性:WebSocket 允许服务器主动向客户端推送数据,从而实现实时通信,这对于需要即时反馈的应用(如在线游戏、聊天应用等)至关重要。
- 全双工通信:WebSocket 支持双向通信,服务器和客户端可以同时发送和接收数据,提高了通信的灵活性。
- 节省带宽:由于 WebSocket 在单个 TCP 连接上运行,避免了为每个消息创建新连接所需的额外开销,减少了数据传输量。
- 更好的二进制支持:WebSocket 定义了二进制帧,可以更轻松地处理二进制内容,如图片、音视频等。
- 跨域通信:WebSocket 支持跨域通信,使得客户端可以与不同域名的服务器进行通信,增加了应用的灵活性和可访问性。
- 可扩展性:WebSocket 定义了扩展机制,用户可以根据需要扩展协议或实现自定义的子协议。
- 更好的支持实时应用:由于 WebSocket 的实时性和全双工通信特性,它特别适合用于需要实时反馈的应用,例如在线游戏、实时音视频聊天等。
- 更快的传输速度:由于 WebSocket 减少了不必要的连接和状态转换,通信速度更快。
- 更低的延迟:由于 WebSocket 建立的是持久连接,减少了建立和关闭连接的开销,从而降低了通信延迟。
- 更强的兼容性:虽然 WebSocket 协议并未在所有浏览器中得到完全支持,但有各种库和框架可以帮助实现兼容性,例如通过 polyfill 技术。
说明:以下为SpringBoot整合Netty+Websocket实现实时的消息通讯
一、引入Netty依赖
<!--netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>
二、 使用步骤
1.配置服务启动类
package com.pzg.chat.communication; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Slf4j @Component public class WebSocketNettyServer { @Autowired WebSocketChannelInitializer webSocketChannelInitializer; /** * Netty服务器启动对象 */ private final ServerBootstrap serverBootstrap = new ServerBootstrap();; @PostConstruct public void WebSocketNettyServerInit() { // 初始化服务器启动对象 // 主线程池 NioEventLoopGroup mainGrp = new NioEventLoopGroup(); // 从线程池 NioEventLoopGroup subGrp = new NioEventLoopGroup(); serverBootstrap // 指定使用上面创建的两个线程池 .group(mainGrp, subGrp) // 指定Netty通道类型 .channel(NioServerSocketChannel.class) // 指定通道初始化器用来加载当Channel收到事件消息后 .childHandler(webSocketChannelInitializer); } public void start() throws InterruptedException { // 绑定服务器端口,以异步的方式启动服务器 ChannelFuture future = serverBootstrap.bind("0.0.0.0",8089).sync(); if (future.isSuccess()){ log.info("netty初始化完成,端口8088"); } } }
说明:@PostConstruct用来保证容器初始化后触发该注解下的方法
2.Netty服务初始化器
package com.pzg.chat.communication; import com.pzg.chat.handler.ChatHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired private ChatHandler chatHandler; @Override protected void initChannel(SocketChannel socketChannel) { //获取对应的管道 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline //添加HTTP编码解码器 .addLast(new HttpServerCodec()) //添加对大数据流的支持 .addLast(new ChunkedWriteHandler()) //添加聚合器 .addLast(new HttpObjectAggregator(1024 * 64)) //设置websocket连接前缀前缀 //心跳检查(8秒) .addLast(new IdleStateHandler(8,0,0)) //添加自定义处理器 .addLast(new WebSocketServerProtocolHandler("/ws",null,true)) .addLast(chatHandler); } }
3.自定义处理器类
package com.pzg.chat.handler; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.pzg.chat.communication.context.impl.WebSocketContext; import com.pzg.chat.service.ChannelService; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @ChannelHandler.Sharable @SuppressWarnings("all") @Component @Slf4j public class ChatHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private static ChatHandler chatHandler; @Autowired private ChannelService channelService; @Autowired private WebSocketContext webSocketContext; @PostConstruct public void init() { chatHandler = this; } /** * 创建ChannelGroup对象存储所有连接的用户 */ private static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 有新消息时会调用这个方法 * * @param channelHandlerContext 上下文处理器 * @param textWebSocketFrame 文本 * @throws Exception */ @Override @SuppressWarnings("all") public void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { System.out.println(frame.getClass()); if (frame instanceof FullHttpRequest) { } //判断是否为关闭事件 if (frame instanceof CloseWebSocketFrame) { ctx.channel().close(); return; } if (frame instanceof PingWebSocketFrame) { return; } if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame; JSONObject jsonObject = JSON.parseObject(textWebSocketFrame.text()); webSocketContext.executeWebSocketContext(jsonObject,ctx.channel()); //类型转为(前后端达成的消息体) } //遍历出所有连接的通道 } /** * 有新的连接建立时 * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //加入通道组 clients.add(ctx.channel()); } /** * 不活跃时会调用这个方法 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //移除出通道组 try { channelService.deleteBindUserIdAndIdChannel(ctx.channel().id().asShortText()); channelService.deleteChannelBindUserId(ctx.channel()); }catch (Exception e){ } clients.remove(ctx.channel()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 获取参数 super.channelActive(ctx); } //检查客户端写心跳事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { Channel channel = ctx.channel(); if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; if (idleStateEvent.state() == IdleState.READER_IDLE) { try { channelService.deleteBindUserIdAndIdChannel(ctx.channel().id().asShortText()); channelService.deleteChannelBindUserId(ctx.channel()); }catch (Exception e){ } clients.remove(channel); channel.close(); } } else { super.userEventTriggered(ctx, evt); } } }
说明:webSocketContext.executeWebSocketContext(jsonObject,ctx.channel()); 为自己定义的处理消息的类,textWebSocketFrame.text()为对应的消息,可自行处理
4.缓存用户Channel接口和对应实现类
1.接口
package com.pzg.chat.service; import io.netty.channel.Channel; public interface ChannelService { void setUserIdBindChannel(Channel channel); void setIdBindChannel(String id,Channel channel); void setChannelBindUserId(Channel channel); String getChannelBindUserId(Channel channel); void deleteChannelBindUserId(Channel channel); void deleteBindUserIdChannel(); void deleteBindIdChannel(String id); void setUserIdAndIdBindChannel(String id ,Channel channel); void deleteBindUserIdAndIdChannel(String id); Channel getUserIdChannel(String userId); Channel getIdBindChannel(String Id); }
2.实现类
package com.pzg.chat.service.impl; import com.pzg.chat.service.ChannelService; import com.pzg.chat.utils.UserUtil; import io.netty.channel.Channel; import org.springframework.stereotype.Service; import java.util.*; @Service public class ChannelServiceImpl implements ChannelService { //保存用户id和channel的映射 public static HashMap<String,Channel> userIdChannel = new HashMap<>(); //保存channelId和channel映射关系 public static HashMap<String,Channel> idChannel = new HashMap<>(); //保存channel和userID映射关系 public static HashMap<Channel,String> ChannelUserId = new HashMap<>(); @Override public void setUserIdBindChannel(Channel channel) { String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId()); userIdChannel.put(userId,channel); } @Override public void setIdBindChannel(String id, Channel channel) { idChannel.put(id,channel); } @Override public void setChannelBindUserId(Channel channel) { String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId()); System.out.println("----------------------->"+userId); ChannelUserId.put(channel,userId); } @Override public String getChannelBindUserId(Channel channel) { return ChannelUserId.get(channel); } @Override public void deleteChannelBindUserId(Channel channel) { ChannelUserId.remove(channel); } @Override public void deleteBindUserIdChannel() { String userId = String.valueOf(UserUtil.getUserDetailsDTO().getId()); userIdChannel.remove(userId); } @Override public void deleteBindIdChannel(String id) { idChannel.remove(id); } @Override public void setUserIdAndIdBindChannel(String id, Channel channel) { setUserIdBindChannel(channel); setIdBindChannel(id,channel); } @Override public void deleteBindUserIdAndIdChannel(String id) { deleteBindIdChannel(id); deleteBindUserIdChannel(); } @Override public Channel getUserIdChannel(String userId) { return userIdChannel.get(userId); } @Override public Channel getIdBindChannel(String Id) { return idChannel.get(Id); } }
说明:缓存Channel主要保证消息能发送到对应的Channel中,消息可携带用户id通过id查找Channel,将信息存入即可 ,通过channel.writeAndFlush(new TextWebSocketFrame("消息内容"));刷出消息。
5.调用start()启动Netty服务
package com.pzg.chat.listener; import com.pzg.chat.communication.WebSocketNettyServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; @Component public class NettyStartListener implements ApplicationListener<ContextRefreshedEvent> { /** * 注入启动器 */ @Autowired private WebSocketNettyServer webSocketNettyServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { //判断event上下文中的父级是否为空 if (event.getApplicationContext().getParent() == null) { try { //为空则调用start方法 webSocketNettyServer.start(); } catch (Exception e) { e.printStackTrace(); } } } }
6.Websocket配置
// 导出socket对象 import {getToken} from '@/utils/auth'; export { socket } import { Message } from 'element-ui' import {header} from "../../listening/header"; import {asidefriend} from "../../listening/asidefriend"; import {chatbox} from "../../listening/chatbox"; import {chatcontent} from "../../listening/chatcontent"; import {videocalls} from "../../listening/videocalls"; import {voicecalls} from "../../listening/voicecalls"; // socket主要对象 var socket = { websock: null, ws_url: "ws://localhost:8089/ws", /** * 开启标识 * */ socket_open: false, /** * 心跳timer * */ hearbeat_timer: null, /** * 心跳发送频率 * */ hearbeat_interval: 5000, /** * 是否开启重连 * */ is_reonnect: true, /** * 重新连接的次数 * */ reconnect_count: 3, /** * 当前重新连接的次数,默认为:1 * */ reconnect_current: 1, /** * 重新连接的时间类型 * */ reconnect_timer: null, /** * 重新连接的间隔 * */ reconnect_interval: 3000, i : 0, timer:null, /** * 初始化连接 */ init: () => { if (!("WebSocket" in window)) { Message({ message: '当前浏览器与网站不兼容丫', type: 'error', }); return null } if (socket.websock && socket.websock.readyState===1) { return socket.websock } socket.websock = new WebSocket(socket.ws_url) //接收消息 socket.websock.onmessage = function (e) { //调用处理消息的方法 socket.receive(e) } // 关闭连接 socket.websock.onclose = function (e) { clearInterval(socket.hearbeat_interval); socket.socket_open = false; if (socket.websock!=null){ header.getWebsocketStatus(socket.websock.readyState); } // 需要重新连接 if (socket.is_reonnect) { socket.reconnect_timer = setTimeout(() => { // 超过重连次数 if (socket.reconnect_current > socket.reconnect_count) { clearTimeout(socket.reconnect_timer) return } // 记录重连次数 socket.reconnect_current++ socket.reconnect() }, socket.reconnect_interval) } } // 连接成功 socket.websock.onopen = function () { // Message({ // message: '连接成功', // type: 'success', // }); header.getWebsocketStatus(socket.websock.readyState); let data = { "action": 10002, "token":getToken(), "chatMsg": null, "extend": 1, }; socket.send(data); socket.socket_open = true; socket.is_reonnect = true; //重修刷新好友内容 window.dispatchEvent(new CustomEvent('connectInit')); // 开启心跳 socket.heartbeat() }; // 连接发生错误 socket.websock.onerror = function (err) { Message({ message: '服务连接发送错误!', type: 'error', }); } }, /** * 获取websocket对象 * */ getSocket:()=>{ //创建了直接返回,反之重来 if (socket.websock) { return socket.websock }else { socket.init(); } }, getStatus:()=> { if (socket.websock.readyState === 0) { return "未连接"; } else if (socket.websock.readyState === 1) { return "已连接"; } else if (socket.websock.readyState === 2) { return "连接正在关闭"; } else if (socket.websock.readyState === 3) { return "连接已关闭"; } }, /** * 发送消息 * @param {*} data 发送数据 * @param {*} callback 发送后的自定义回调函数 */ send: (data, callback = null) => { // 开启状态直接发送 if (socket.websock!=null && socket.websock.readyState === socket.websock.OPEN) { try { socket.websock.send(JSON.stringify(data)); }catch (e) { if (socket.timer !=null){ return } socket.timer = setInterval(()=>{ if (i>=6){ clearInterval(socket.timer); socket.timer = null; socket.i = 0; return } socket.websock.send(JSON.stringify(data)); socket.i++; },2000) } if (callback) { callback() } // 正在开启状态,则等待1s后重新调用 } else if (socket.websock!=null && socket.websock.readyState === socket.websock.CONNECTING) { setTimeout(function () { socket.send(data, callback) }, 1000) // 未开启,则等待1s后重新调用 } else if (socket.websock!=null){ socket.init(); setTimeout(function () { socket.send(data, callback) }, 1000) } }, /** * 接收消息 * @param {*} message 接收到的消息 */ receive: (message) => { var recData = JSON.parse(message.data); /** *这部分是我们具体的对消息的处理 * */ console.log(recData) // 自行扩展其他业务处理... }, /** * 心跳 */ heartbeat: () => { if (socket.hearbeat_timer) { clearInterval(socket.hearbeat_timer) } socket.hearbeat_timer = setInterval(() => { //发送心跳包 let data = { "action": 10000, "token":getToken(), "chatMsg": null, "extend": null, }; socket.send(data) }, socket.hearbeat_interval) }, /** * 主动关闭连接 */ close: () => { if (socket.websock==null){ return } let data = { "action": 10002, "token":getToken(), "chatMsg": null, "extend": 0, }; socket.send(data); clearInterval(socket.hearbeat_interval); socket.is_reonnect = false; socket.websock.close(); header.getWebsocketStatus(socket.websock.readyState); socket.websock=null }, /** * 重新连接 */ reconnect: () => { if (socket.websock && socket.socket_open) { socket.websock.close() } socket.init() }, }
说明:通过登入后,在某个全局页面中调用socket.start()即可连接netty服务器,通过socket.send("消息")来发送消息。
三、结束语
到此这篇关于SpringBoot整合Netty+Websocket实现消息推送的示例代码的文章就介绍到这了,更多相关SpringBoot Netty Websocket消息推送内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!