SpringCloud整合Netty集群实现WebSocket的示例代码
作者:黑加菲
引言
在分布式系统中,Spring Cloud 为微服务架构提供了丰富的功能,而 Netty 是一个高性能的网络通信框架。将二者结合实现 Socket 集群,可以满足大规模、高性能网络通信的需求,实现前后端间高效稳定的通信。
1. 服务注册和发现中心
这里服务注册和发行中心使用nacos为例(需要启动一个nacos服务器)。
微服务注册: 在每一个微服务项目中,添加Nacos客户端连接,并在配置文件中指定服务名称和端口。例如:
# Tomcat server: port: 9201 netty: port: 10201 application: name: yhy-netty-server # Spring spring: application: # 应用名称 name: soc-dmoasp-system profiles: # 环境配置 active: dev cloud: nacos: discovery: # 服务注册地址 server-addr: nacos-registry:8858 config: # 配置中心地址 server-addr: nacos-registry:8858 file-extension: yml # 共享配置 shared-configs: - data-id: application.${spring.cloud.nacos.config.file-extension} refresh: true - data-id: soc-dmoasp-redission.${spring.cloud.nacos.config.file-extension} - data-id: soc-dmoasp-druid.${spring.cloud.nacos.config.file-extension}
这是一个基本的服务配置。里面关于netty的applicaiton.name和port可以通过Nacos的NamingService类手动注册。
1.1. Netty服务器搭建
- 添加Netty依赖:在具体微服务中的
pom.xm
l中添加Netty依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-reactor-netty</artifactId> </dependency>
- Netty启动类:创建一个NettyServer类,用于启动Netty,示例如下:
import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.naming.NamingFactory; import com.alibaba.nacos.api.naming.NamingService; import com.alibaba.nacos.api.naming.pojo.Instance; import com.soc.dmoasp.system.server.handler.WebSocketIdleStateHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.net.InetAddress; import java.util.Properties; /** * Netty服务 * @author dsw * @date 2024/11/18 17:34 */ @Component public class NettyServer implements CommandLineRunner { Logger log = LoggerFactory.getLogger(NettyServer.class); @Autowired private NacosDiscoveryProperties nacosDiscoveryProperties; @Value("${server.netty.port}") private Integer nettyPort; @Value("${server.netty.application.name}") private String nettyName; EventLoopGroup bossGroup; EventLoopGroup workGroup; @Override public void run(String... args) throws Exception { log.info("初始化netty配置开始"); //netty 服务端启动的端口不可和Springboot启动类的端口号重复 this.start(); //关闭服务器的时候同时关闭Netty服务 Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { this.destroy(); } catch (InterruptedException e) { log.error(e.getMessage()); } })); } @Async public void start() throws InterruptedException { try { bossGroup = new NioEventLoopGroup(1); workGroup = new NioEventLoopGroup(10); ServerBootstrap bootstrap = new ServerBootstrap(); // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作 bootstrap.group(bossGroup, workGroup) // 指定Channel .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline //HTTP解码 .addLast(new HttpServerCodec()) .addLast(new ChunkedWriteHandler()) //HTTP段聚合 .addLast(new HttpObjectAggregator(1024*1024)) //将HTTP协议转成ws协议 .addLast(new WebSocketServerProtocolHandler("/socket")) ; } }); registerNamingService(nettyName,nettyPort); // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功 ChannelFuture future = bootstrap.bind(nettyPort).sync(); if (future.isSuccess()) { log.info("Server started and listen on:{}", future.channel().localAddress()); log.info("启动 Netty Server"); } } catch (InterruptedException e) { log.error("netty异常:{}", e.getMessage()); } } /** * 将Netty服务注册进Nacos * * @param nettyName 服务名称 * @param nettyPort 服务端口号 */ private void registerNamingService(String nettyName, Integer nettyPort) { try { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr()); properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace()); properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername()); properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword()); NamingService namingService = NamingFactory.createNamingService(properties); InetAddress address = InetAddress.getLocalHost(); // 定义服务实例信息 Instance instance = new Instance(); instance.setIp(address.getHostAddress()); instance.setPort(nettyPort); instance.setWeight(1.0); instance.setHealthy(true); namingService.registerInstance(nettyName, nacosDiscoveryProperties.getGroup(), instance); } catch (Exception e) { throw new RuntimeException(e); } } /** * 释放资源 */ @PreDestroy public void destroy() throws InterruptedException { if (bossGroup != null) { bossGroup.shutdownGracefully().sync(); } if (workGroup != null) { workGroup.shutdownGracefully().sync(); } log.info("关闭Netty"); } }
使用CommandLineRunner接口实现run方法在启动项目的时候把Netty服务带起。
bossGroup
和 workGroup
的角色区别
bossGroup(老板组)
主要职责是负责监听服务器端的端口,等待新的客户端连接请求到来。它就像是公司里负责接待新客户的前台人员,当有新客户(客户端)想要连接到服务器时,
bossGroup
中的EventLoop
会接收到这个连接请求。一般情况下,
bossGroup
只需要配置较少数量的EventLoop
就可以满足需求,因为它主要处理的是连接建立的初期阶段,即接受新连接这个相对不那么频繁的操作(相比于后续处理大量数据传输等操作)。通常会设置为 1 个EventLoop
或者根据服务器的具体性能和预期的连接请求频率适当增加数量,但总体数量相对较少。workGroup(工作组)
一旦
bossGroup
接受了新的客户端连接,就会把这个新连接交给workGroup
来进一步处理后续的所有与该连接相关的操作,比如读取客户端发送的数据、向客户端发送响应数据等。它就像是公司里负责具体为客户办理业务的工作人员。workGroup
需要处理大量的实际业务数据传输和交互工作,所以通常会根据服务器的性能和预期要处理的并发连接数量等因素,配置相对较多数量的EventLoop
。例如,在处理高并发场景时,可能会配置几十甚至上百个EventLoop
来确保能够高效地处理众多客户端连接的各种业务操作。
registerNamingService方法
这时候可以看到我们Nacos配置中配置了server.netty.port
和server.netty.application.name
这两个参数分别对应netty的端口和netty的微服务应用名。
registerNamingService方法用于往Nacos中注册服务,这里通过NamingService
类的registerInstance方法
将netty服务注册进Nacos中。
1.2. Gateway网关转发
微服务中所有的请求都是由网关转发,这里使用Gateway转发。
# spring配置 spring: cloud: gateway: discovery: locator: lowerCaseServiceId: true enabled: true routes: # 系统模块 - id: soc-dmoasp-system uri: lb://soc-dmoasp-system predicates: - Path=/system/** filters: - StripPrefix=1 # netty服务 - id: netty-server uri: lb:ws://soc-netty-server predicates: - Path=/netty-server/** filters: - StripPrefix=1 #不需要进行权限校验的uri security: ignore: whites: - /auth/logout - /auth/login - /auth/register - /*/v2/api-docs - /csrf #netty连接地址 - /netty-server/**
配置文件中添加Netty路由,在鉴权网关中需要将socket地址放行,不进行权限验证。例如:
@Component @RefreshScope public class AuthFilter implements GlobalFilter, Ordered { private static final Logger log = LoggerFactory.getLogger(AuthFilter.class); // 排除过滤的 uri 地址,nacos自行添加 @Autowired private IgnoreWhiteProperties ignoreWhite; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest.Builder mutate = request.mutate(); String url = request.getURI().getPath(); // 跳过不需要验证的路径 if (StringUtils.matches(url, ignoreWhite.getWhites())) { return chain.filter(exchange); } ...... } }
启动Gateway和System模块
启动完成后System模块会打印NettyServer输出的启动日志,Nacos中也会有手动注册的Netty服务。
通过ws://127.0.0.1:8080/netty-server/socket就可以直接连接上Netty服务器(8080为Gateway的端口)。
2. 鉴权、心跳、客户端与服务端之间的通信
2.1. 鉴权
创建AuthHandler类,继承SimpleChannelInboundHandler
类重写channelRead0
方法,channelRead0
中可以监听到客户端往服务端发送的消息。 例如:
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.core.constant.TokenConstants; import com.soc.dmoasp.common.core.enums.NettyMsgEnum; import com.soc.dmoasp.common.core.utils.JwtUtils; import com.soc.dmoasp.common.core.utils.StringUtils; import com.soc.dmoasp.common.redis.service.RedisService; import com.soc.dmoasp.system.server.vo.NettyResult; import io.jsonwebtoken.Claims; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; /** * netty鉴权处理 * @author dsw * @date 2024/11/18 17:55 */ public class AuthHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { Logger log = LoggerFactory.getLogger(AuthHandler.class); private final RedisTemplate<String, Object> redisTemplate; private final RedisService redisService; public AuthHandler(RedisTemplate<String, Object> stringObjectRedisTemplate, RedisService redisService) { redisTemplate = stringObjectRedisTemplate; this.redisService = redisService; } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { try { JSONObject clientMessage = JSON.parseObject(textWebSocketFrame.text()); //获取code 只判断code为鉴权的消息类型 Integer code = clientMessage.getInteger("code"); if (NettyMsgEnum.AUTH_MESSAGE.getCode().equals(code)) { //获取token String token = clientMessage.getString("token"); if (StringUtils.isEmpty(token)) { ctx.channel().writeAndFlush(NettyResult.authFail("令牌不能为空")); ctx.close(); } // 如果前端设置了令牌前缀,则裁剪掉前缀 if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX)) { token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY); } //JWT校验 Claims claims = JwtUtils.parseToken(token); if (claims == null) { ctx.channel().writeAndFlush(NettyResult.authFail("令牌已过期或验证不正确")); ctx.close(); } String userkey = JwtUtils.getUserKey(claims); //从Redis中查看是否有这个Token没有则不能登录 boolean islogin = redisService.hasKey(getTokenKey(userkey)); if (!islogin) { ctx.channel().writeAndFlush(NettyResult.authFail("登录状态已过期")); ctx.close(); } //获取用户保存至Socket连接会话中 String userId = JwtUtils.getUserId(claims); AttributeKey<String> userIdKey = AttributeKey.valueOf("userId"); ctx.channel().attr(userIdKey).setIfAbsent(userId); JSONObject jsonObject = new JSONObject(); jsonObject.put("userId",userId); log.info("有新的Socket客户端链接 userId :{}", userId); //将连接信息保存至Redis中key为userId value为ctx.channel().id() redisTemplate.opsForHash().put(CacheConstants.getUserChannelKey(),userId,ctx.channel().id()); ctx.channel().writeAndFlush(NettyResult.success(NettyMsgEnum.AUTH_MESSAGE.getCode(), "鉴权成功", jsonObject)); //鉴权完成后移除AuthHandler消息监听 ctx.pipeline().remove(AuthHandler.class); } else { ctx.channel().writeAndFlush(NettyResult.authFail("请先鉴权,在发送其他类型请求!")); ctx.close(); } } catch (Exception e) { log.error(e.getMessage()); ctx.channel().writeAndFlush(NettyResult.authFail("鉴权失败")); ctx.close(); } } /** * 获取缓存key */ private String getTokenKey(String token) { return CacheConstants.LOGIN_TOKEN_KEY + token; } }
泛型TextWebSocketFrame
表示接收文本类型的消息。
其中连接的用户信息保存到Redis中,RedisTemplate<String, Object> redisTemplate
对象是用来保存Netty连接信息的,序列化使用的是String(用户信息用String存储,使用JSON序列化会反序列化失败),对应接收的JSON串如下:
{"code":1001,token:"Bearer XXXX"}
code取NettyMsgEnum中的code,token则是登录时生成的token令牌。
鉴权后将AuthHandler移除,会话后续的消息交互不在进AuthHandler。
NettyMsgEnum如下:
/** * netty消息类型枚举 * @author dsw * @date 2024/11/18 17:58 */ public enum NettyMsgEnum { AUTH_MESSAGE(1001, "鉴权消息","Auth-Netty"), //{'code':1003,'data':{'unreadCount':0}} NOTICE_MESSAGE(1003, "公告通知消息","Notice-Netty"), HEART_MESSAGE(1006, "心跳消息","Heart-Netty"), ERROR_MESSAGE(-1, "错误",null); private final Integer code; private final String info; private final String strategyName; NettyMsgEnum(Integer code, String info, String strategyName){ this.code = code; this.info = info; this.strategyName = strategyName; } public static NettyMsgEnum getByCode(Integer code) { for (NettyMsgEnum msgEnum : values()) { if (msgEnum.getCode().equals(code)) { return msgEnum; } } return ERROR_MESSAGE; } public Integer getCode() { return code; } public String getInfo() { return info; } public String getStrategyName() { return strategyName; } }
NettyResult如下:
import com.alibaba.fastjson2.JSON; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.io.Serializable; /** * netty响应实体 * @author dsw * @date 2024/11/18 18:02 */ public class NettyResult implements Serializable { private static final long serialVersionUID = 1L; private Integer code; private String message; private Object data; public NettyResult(Integer code, String message, Object data) { this.code = code; this.message = message; this.data = data; } public static TextWebSocketFrame fail(String message) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-1, message, null))); } public static TextWebSocketFrame authFail(String message) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-2, message, null))); } public static TextWebSocketFrame success( String message) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(200, message, null))); } public static TextWebSocketFrame success(Integer code, Object data) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,null, data))); } public static TextWebSocketFrame success(Integer code, String message, Object data) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,message, data))); } public Integer getCode() { return code; } public String getMessage() { return message; } public Object getData() { return data; } }
最后到NettyServer中的ChannelInitializer加入AuthHandler:
我们重新项目后连接socket查看结果
如果不鉴权直接发送消息,服务端会主动断开连接,客户端需要重连。
这就代表已经连接成功了。
2.2. 空闲检测
创建WebSocketIdleStateHandler
类继承IdleStateHandler
类,重写channelIdle
方法。
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; /** * 空闲检测 * @author dsw * @date 2024/11/18 11:47 */ public class WebSocketIdleStateHandler extends IdleStateHandler { Logger log = LoggerFactory.getLogger(WebSocketIdleStateHandler.class); /** * 默认的读空闲时间 */ private static final int DEFAULT_READER_IDLE_TIME = 10; /** * 默认10秒读空闲断开客户端 */ public WebSocketIdleStateHandler() { super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } /** * 指定心跳时间(秒) * * @param readerIdleTimeSeconds 读空闲时间 * @param writerIdleTimeSeconds 写空闲时间 * @param allIdleTimeSeconds 读写空闲时间 */ public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS); } /** * 指定心跳时间及时间单位 * * @param readerIdleTime 读空闲时间 * @param writerIdleTime 写空闲时间 * @param allIdleTime 读写空闲时间 * @param unit 时间单位 */ public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { super(readerIdleTime, writerIdleTime, allIdleTime, unit); } /** * 当空闲事件触发时执行 */ @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { //如果是读空闲 if (evt.state().equals(IdleState.READER_IDLE)) { Channel channel = ctx.channel(); log.debug("服务端未检测到客户端【{}】的心跳包,强制关闭客户端!", channel.id()); channel.close(); } super.channelIdle(ctx,evt); } }
以上实现了父类的构造函数,可以指定具体的空闲时间。当空闲时会触发channelIdle
方法,则服务端主动断开连接。
最后到NettyServer中的ChannelInitializer加入WebSocketIdleStateHandler:
加到最前面。
示例设置了10秒断开,需要使用中自行调整。
2.3. 消息通信
2.3.1. 接收客户端的消息
创建WebSocketHandler
类,继承SimpleChannelInboundHandler
类重写channelRead0、handlerAdded、handlerRemoved、exceptionCaught
方法。
channelRead0
方法:监听客户端发送过来的消息。handlerAdded
方法:websocket连接后会调用,将连接信息添加到通道组handlerRemoved
方法:断开连接后会调用(服务端、客户端断开都会调用),用于用户下线(删除通道、删除Redis中存储的连接信息)exceptionCaught
方法:发生异常后调用,发生异常后服务端通常会主动断开连接。
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.core.enums.NettyMsgEnum; import com.soc.dmoasp.system.server.config.NettyConfig; import com.soc.dmoasp.system.server.strategy.NettyStrategy; import com.soc.dmoasp.system.server.strategy.NettyStrategyFactory; import com.soc.dmoasp.system.server.vo.NettyResult; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.AttributeKey; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; /** * webSocket处理 * @author dsw * @date 2024/11/18 10:20 */ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { Logger log = LoggerFactory.getLogger(WebSocketHandler.class); private final RedisTemplate<String, Object> redisTemplate; private final NettyStrategyFactory nettyStrategyFactory; public WebSocketHandler(RedisTemplate<String, Object> redisTemplate, NettyStrategyFactory nettyStrategyFactory) { this.redisTemplate = redisTemplate; this.nettyStrategyFactory = nettyStrategyFactory; } /** * webSocket连接创建后调用 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 添加到channelGroup 通道组 NettyConfig.getChannelGroup().add(ctx.channel()); } /** * 读取数据 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception { AttributeKey<String> userIdKey = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(userIdKey).get(); log.info("收到消息 userId:{} message:{}",userId,frame.text()); // 接收客户端的消息 JSONObject pullMessage = JSON.parseObject(frame.text()); Integer code = pullMessage.getInteger("code"); // 获取消息类型 NettyStrategy nettyStrategy = nettyStrategyFactory.getNettyStrategy(NettyMsgEnum.getByCode(code)); // 处理消息 TextWebSocketFrame pushMessage = nettyStrategy.execute(pullMessage); // 返回处理结果给客户端 ctx.channel().writeAndFlush(pushMessage); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { AttributeKey<String> userIdKey = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(userIdKey).get(); log.info("用户下线了 userId:{}",userId); // 删除通道 removeUserId(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().writeAndFlush(NettyResult.fail("系统错误:" + cause.getMessage())); } /** * 删除用户与channel的对应关系 */ private void removeUserId(ChannelHandlerContext ctx) { AttributeKey<String> userIdKey = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(userIdKey).get(); if(StringUtils.isNotBlank(userId)){ redisTemplate.opsForHash().delete(CacheConstants.getUserChannelKey(),userId); } } }
这里收到消息后通过一个策略模式进入不同的策略,通过NettyMsgEnum
里面定义的code指定不同的策略类。
Strategy和StrategyFactory:
import com.alibaba.fastjson2.JSONObject; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * Netty接收消息处理策略类 * @author dsw * @date 2024/5/27 10:21 */ public interface NettyStrategy { /** * 执行添加数值 * * @return */ TextWebSocketFrame execute(JSONObject message); }
import com.soc.dmoasp.common.core.enums.NettyMsgEnum; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Map; /** * Netty策略工厂 * @author dsw * @date 2024/11/18 10:20 */ @Component public class NettyStrategyFactory { Logger log = org.slf4j.LoggerFactory.getLogger(NettyStrategyFactory.class); /** * 通过Spring容器的方式注入 */ @Autowired private Map<String, NettyStrategy> nettyStrategy; /** * 获取对应策略类 * @param */ public NettyStrategy getNettyStrategy(NettyMsgEnum nettyMsgEnum){ if(!nettyStrategy.containsKey(nettyMsgEnum.getStrategyName())){ log.warn("没有对应的消息策略"); throw new RuntimeException("没有对应的消息策略"); } return nettyStrategy.get(nettyMsgEnum.getStrategyName()); } }
我们实现一个心跳消息的策略:
import com.alibaba.fastjson2.JSONObject; import com.soc.dmoasp.common.core.enums.NettyMsgEnum; import com.soc.dmoasp.system.server.strategy.NettyStrategy; import com.soc.dmoasp.system.server.vo.NettyResult; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.stereotype.Component; /** * Netty心跳消息 * @author dsw * @date 2024/5/27 10:25 */ @Component("Heart-Netty") public class HeartStrategyImpl implements NettyStrategy { @Override public TextWebSocketFrame execute(JSONObject message) { String data = message.getString("data"); if ("ping".equals(data)) { return NettyResult.success(NettyMsgEnum.HEART_MESSAGE.getCode(), null, "pong"); } return NettyResult.fail("消息格式不正确"); } }
添加至ChannelInitializer
后重启查看效果。
2.3.2. 发送消息给客户端
集群下面的Netty配置
方案1:使用Redis的发布订阅
方案2:使用MQ的发布订阅
我们这里使用使用Redis的发布订阅实现。
添加Redis订阅器:
import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.redis.configure.FastJson2JsonRedisSerializer; import com.soc.dmoasp.system.server.receiver.PushMsgRedisReceiver; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import java.util.Collections; import java.util.List; /** * redisReceiver 配置 * @author dsw * @date 2024/11/18 12:07 */ @Configuration public class RedisReceiverConfig { @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); List<PatternTopic> topics = Collections.singletonList( PatternTopic.of(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC) ); // 添加订阅者监听类,数量不限.PatternTopic定义监听主题,这里监听test-topic主题 container.addMessageListener(listenerAdapter, topics); return container; } @Bean @SuppressWarnings(value = { "unchecked", "rawtypes" }) public MessageListenerAdapter listenerAdapter(PushMsgRedisReceiver pushMsgRedisReceiver) { MessageListenerAdapter adapter = new MessageListenerAdapter(pushMsgRedisReceiver); FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class); adapter.setSerializer(serializer); return adapter; } }
import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.core.exception.Asserts; import com.soc.dmoasp.common.redis.dto.NettyMessage; import com.soc.dmoasp.system.server.config.NettyConfig; import com.soc.dmoasp.system.server.vo.NettyResult; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.Objects; /** * Netty发送消息接收者 * @author dsw * @date 2024/11/18 12:07 */ @Component public class PushMsgRedisReceiver { Logger log = LoggerFactory.getLogger(PushMsgRedisReceiver.class); @Autowired private RedisTemplate<String,Object> stringObjectRedisTemplate; /** * RedisTopic订阅 * @param nettyMessage netty消息 * @param topic netty消息对应的topic */ public void handleMessage(NettyMessage nettyMessage, String topic) { Object channelId = stringObjectRedisTemplate.opsForHash().get(CacheConstants.getUserChannelKey(), nettyMessage.getUserId()); if (Objects.isNull(channelId)) { log.warn("推送消息失败,用户不在线! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage()); Asserts.fail("推送消息失败,用户不在线!"); } Channel channel = NettyConfig.getChannelGroup().find((ChannelId) channelId); if(channel!=null){ channel.writeAndFlush(NettyResult.success(nettyMessage.getCode(),nettyMessage.getMessage())); log.info("推送消息成功! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage()); }else { log.warn("推送消息失败,没有找到Channel! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage()); } } }
发布订阅的机制就是所有集群都会收到消息,收到消息后每个netty集群都去找对应的消息会话通道,如果没找到则说明连接不到当前服务上,找到通道后则可以直接推送。 这里使用stringObjectRedisTemplate
获取用户通道,避免序列化失败。
RedisService中实现发布消息
/** * Redis消息发布订阅 发布消息 * @param channel 通道ID * @param message 消息 */ @Async public void convertAndSend(String channel, Object message) { redisTemplate.convertAndSend(channel, message); }
发布消息的工具类:
import com.alibaba.fastjson2.JSONObject; import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.redis.dto.NettyMessage; import com.soc.dmoasp.common.redis.dto.PushSocketMsgDTO; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.Set; /** * Netty发送消息给服务端 * @author dsw * @date 2024/11/18 11:47 */ @Service public class PushSocketMsgService { Logger log = org.slf4j.LoggerFactory.getLogger(PushSocketMsgService.class); @Autowired private RedisTemplate<String, Object> stringObjectRedisTemplate; @Autowired private RedisService redisService; /** * 给所有用户发送消息 * @param msgDTO */ public void pushMsgToAll( PushSocketMsgDTO msgDTO) { Set<Object> keys = stringObjectRedisTemplate.opsForHash().keys(CacheConstants.getUserChannelKey()); keys.forEach(key -> this.pushMsgToUser(msgDTO.getCode(), key.toString(), msgDTO.getMessage())); } /** * 给指定用户发送消息 * @param msgDTO */ public void pushMsgToUserList(PushSocketMsgDTO msgDTO) { for(Long userId : msgDTO.getUserIdList()){ this.pushMsgToUser(msgDTO.getCode(), userId.toString(), msgDTO.getMessage()); } } protected void pushMsgToUser(Integer code, String userId, JSONObject message) { //推送到其他负载处理 NettyMessage nettyMessage = new NettyMessage(); nettyMessage.setUserId(userId); nettyMessage.setCode(code); nettyMessage.setMessage(message); redisService.convertAndSend(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC, nettyMessage); log.info("推送消息成功! userId:{},message:{}", userId, message); } }
NettyMessage和PushSocketMsgDTO:
import com.alibaba.fastjson2.JSONObject; import lombok.Data; /** * Neety消息发布VO * @author dsw * @date 2024/11/18 13:49 */ @Data public class NettyMessage { private Integer code; private String userId; private JSONObject message; }
import com.alibaba.fastjson2.JSONObject; import lombok.Data; /** * 发送socket消息DTO * @author dsw * @date 2024/11/18 13:39 */ @Data public class PushSocketMsgDTO { /** * 消息类型 * 详情看 NettyMsgEnum */ private Integer code; /** * 用户ID */ private List<Long> userIdList; /** * 消息体 */ private JSONObject message; }
测试结果:
以上就是SpringCloud整合Netty集群实现WebSocket的示例代码的详细内容,更多关于SpringCloud Netty实现WebSocket的资料请关注脚本之家其它相关文章!